Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a simple topology working in Local mode- It reads the messages from a Kafka topic and sends them to a bolt that logs them. However, when I try to submit the Topology to a cluster, the Storm UI always reads 0 tuples emitted from the KafkaSpout.
I've done several laps around the internet at this point, built and tried different starter projects, and each has the same issue. I can submit the Topology, but it won't actually work. Similar problems to mine seem to come from the Storm /lib and incompatible .jar files within. I haven't found anything like that in my case. However, I'm not 100% sure what I should be looking for so I can't rule it out. I don't know how to make code look pretty on a mailing list, so here is a stack overflow about my issue: https://stackoverflow.com/questions/46676377/apache-storm-kafka-cant-see-sent-kafka-messages-in-storm-ui I make sure to call storm.supervisor before testing. I have zookeeper running off port 2181. I spin up a Kafka broker and use the topic storm-test-topic1. I fire up a console Kafka producer to send nonsense messages. Storm.yaml: ########### These MUST be filled in for a storm configuration storm.zookeeper.servers: - "localhost" # - "server2" # nimbus.seeds: ["localhost"] # # ---------------------------------------------------------------------------------------------- Topology: package com.kafka.storm; import java.util.HashMap; import org.apache.log4j.Logger; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.StringScheme; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.TopologyBuilder; import com.kafka.storm.bolt.LoggerBolt; public class KafkaStormIntegrationDemo { private static final Logger LOG = Logger.getLogger(KafkaStormIntegrationDemo.class); public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException { // Build Spout configuration using input command line parameters final BrokerHosts zkrHosts = new ZkHosts("localhost:2181"); final String kafkaTopic = "storm-test-topic1"; final String zkRoot = ""; final String clientId = "storm-consumer"; SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId); kafkaConf.startOffsetTime = -2; kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // Build topology to consume message from kafka and print them on console final TopologyBuilder topologyBuilder = new TopologyBuilder(); // Create KafkaSpout instance using Kafka configuration and add it to topology topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1); //Route the output of Kafka Spout to Logger bolt to log messages consumed from Kafka topologyBuilder.setBolt("print-messages", new LoggerBolt()).globalGrouping("kafka-spout"); // Submit topology to local cluster i.e. embedded storm instance in eclipse Config conf = new Config(); System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/storm-core-1.1.1.jar"); StormSubmitter.submitTopology("kafkaTopology", conf, topologyBuilder.createTopology()); } } ---------------------------------------------------------------------------------------------- Bolt: package com.kafka.storm.bolt; import org.apache.log4j.Logger; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; public class LoggerBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; private static final Logger LOG = Logger.getLogger(LoggerBolt.class); public void execute(Tuple input, BasicOutputCollector collector) { LOG.info(input.getString(0)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("message")); } } thank you in advance for any help you can give, or for just reading!