博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
storm 读取不到对应的kafka数据
阅读量:6884 次
发布时间:2019-06-27

本文共 3387 字,大约阅读时间需要 11 分钟。

坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖

UTF-8
1.7
1.7
1.1.1
0.9.0.0
org.apache.storm
storm-core
${storm.version}
org.slf4j
log4j-over-slf4j
org.slf4j
slf4j-api
org.apache.storm
storm-kafka
${storm.version}
org.apache.kafka
kafka_2.11
${kafka.version}
org.apache.zookeeper
zookeeper
org.slf4j
slf4j-log4j12
commons-io
commons-io
2.4
org.apache.kafka
kafka-clients
${kafka.version}

坑二: input.getBinaryByField(“bytes”); 里面一定要写成bytes,这是上游kafkaSpout 传递过来,源码中也可以看到。

对应位置如下图

这里写图片描述

业务代码体现:

public void execute(Tuple input) {
try {
byte[] bytes = input.getBinaryByField(“bytes”);
String value = new String(bytes);
System.out.println(“value ” + value);
this.collector.ack(input);
} catch (Exception e) {
e.printStackTrace();
this.collector.fail(input);
}
}

坑三:本地测试是,一直接收不到kafkaSpout发送过来的消息:

1)问题是已经连接上了kafka,也读到了对应的分区
2)推断可能是上游的数据发送不过来—》 可能原因shuffleGrouping时 的参数传递错误。
3)最终发现 原来就是SPOUT_ID 获取错了
应该将下面代码中的

String SPOUT_ID = kafkaSpout.getClass().getSimpleName()     替换成     String SPOUT_ID = KafkaSpout.class.getSimpleName();     即可。         // kafka 使用的zk hosts        BrokerHosts hosts = new ZkHosts("hadoop000:2181");//        指定的kafak的一个根目录,存储的是kafkaSpout读取数据的位置信息(offset)        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());        spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);        String SPOUT_ID = kafkaSpout.getClass().getSimpleName();        builder.setSpout(SPOUT_ID, kafkaSpout);        String BOLD_ID = LogProcessBolt.class.getSimpleName();        builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID);        LocalCluster cluster = new LocalCluster();        cluster.submitTopology("StormToKafkaTopology", new Config(), builder.createTopology());

坑四: storm重复消费kafak数据:

官网解释如下:

这里写图片描述

代码中配置为如下即可   SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());  spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费

坑五: storm消费数据,ack,fail这些比配,如果出现问题还可以重试

这里写图片描述

你可能感兴趣的文章
比特币:一种点对点的电子现金系统
查看>>
Android - 按钮组件详解
查看>>
MEF简单学习笔记
查看>>
Srping - bean的依赖注入(Dependency injection)
查看>>
NSAutoreleasePool 用处
查看>>
import matplotlib.pyplot as plt出错
查看>>
常用集合与Dictionary用例
查看>>
MVC
查看>>
AI - TensorFlow - 张量(Tensor)
查看>>
js table 导出 Excel
查看>>
AHSC DAY2总结
查看>>
java.lang.SecurityException: class "javax.servlet.FilterRegistration"(spark下maven)
查看>>
[Vue CLI 3] 配置解析之 css.extract
查看>>
Linux——信息采集(三)dmitry、路由跟踪命令tracerouter
查看>>
提取ipa里面的资源图片 png
查看>>
wxpython ItemContainer
查看>>
工作中 Oracle 常用数据字典集锦
查看>>
SFB 项目经验-12-为某上市企业的Skype for Business购买Godday证书
查看>>
[C#基础知识]专题十三:全面解析对象集合初始化器、匿名类型和隐式类型
查看>>
大数据虚拟化零起点-2基础运维第一步-环境规划和准备
查看>>