本文共 3387 字,大约阅读时间需要 11 分钟。
坑一:pom文件主要内容:注意里面 需要 使用 “exclusion”排除相关的依赖
UTF-8 1.7 1.7 1.1.1 0.9.0.0 org.apache.storm storm-core ${storm.version} org.slf4j log4j-over-slf4j org.slf4j slf4j-api org.apache.storm storm-kafka ${storm.version} org.apache.kafka kafka_2.11 ${kafka.version} org.apache.zookeeper zookeeper org.slf4j slf4j-log4j12 commons-io commons-io 2.4 org.apache.kafka kafka-clients ${kafka.version}
坑二: input.getBinaryByField(“bytes”); 里面一定要写成bytes,这是上游kafkaSpout 传递过来,源码中也可以看到。
对应位置如下图业务代码体现:
public void execute(Tuple input) { try { byte[] bytes = input.getBinaryByField(“bytes”); String value = new String(bytes); System.out.println(“value ” + value); this.collector.ack(input); } catch (Exception e) { e.printStackTrace(); this.collector.fail(input); } }坑三:本地测试是,一直接收不到kafkaSpout发送过来的消息:
1)问题是已经连接上了kafka,也读到了对应的分区 2)推断可能是上游的数据发送不过来—》 可能原因shuffleGrouping时 的参数传递错误。 3)最终发现 原来就是SPOUT_ID 获取错了 应该将下面代码中的String SPOUT_ID = kafkaSpout.getClass().getSimpleName() 替换成 String SPOUT_ID = KafkaSpout.class.getSimpleName(); 即可。 // kafka 使用的zk hosts BrokerHosts hosts = new ZkHosts("hadoop000:2181");// 指定的kafak的一个根目录,存储的是kafkaSpout读取数据的位置信息(offset) SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费 KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); String SPOUT_ID = kafkaSpout.getClass().getSimpleName(); builder.setSpout(SPOUT_ID, kafkaSpout); String BOLD_ID = LogProcessBolt.class.getSimpleName(); builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("StormToKafkaTopology", new Config(), builder.createTopology());
坑四: storm重复消费kafak数据:
官网解释如下:
代码中配置为如下即可 SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); // 设置从最近的消息开始消费
坑五: storm消费数据,ack,fail这些比配,如果出现问题还可以重试