Yes. Thank you for replying! I've been fussing over it some more and I think I'm getting closer to the issue.
In fact, the logs do give a clue- my workers start in state "EMPTY -assignment null," do nothing, then get removed after not being used. The work isn't even hitting the workers. in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as the leader, and localhost as offline. So, somehow, I must have my nimbus and workers running somewhere completely different from the Kafka cluster, which are running on localhost. I am currently futzing with port numbers in storm.yaml. How can I bring localhost online as the leader? On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <s...@apache.org> wrote: > Hi Ryan, > > I don't see anything obviously wrong with your configuration. It's likely > your topology logs can tell you what's going wrong. Next time you start > your topology make note of the topology name in Storm UI. Also click in to > your spout in Storm UI and note which worker port(s) it's running on (if > you're running on a multi-node cluster you'll also need to note which > machine is running the spout). You should then be able to go to > $storm-install-dir/logs/workers-artifacts/$your- > topology-name-here/$worker-port/worker.log on the relevant worker and see > what the spout worker is logging. > > In case you don't find anything interesting there, you might also look at > logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on > the machine running the supervisor for those logs. > > Also just to make sure, you're running "storm supervisor" as well as > "storm nimbus", right? Otherwise your topology won't be assigned to a > worker. > > 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ryan.bli...@gmail.com>: > >> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a >> simple topology working in Local mode- It reads the messages from a Kafka >> topic and sends them to a bolt that logs them. However, when I try to >> submit the Topology to a cluster, the Storm UI always reads 0 tuples >> emitted from the KafkaSpout. >> >> I've done several laps around the internet at this point, built and tried >> different starter projects, and each has the same issue. I can submit the >> Topology, but it won't actually work. >> >> Similar problems to mine seem to come from the Storm /lib and >> incompatible .jar files within. I haven't found anything like that in my >> case. However, I'm not 100% sure what I should be looking for so I can't >> rule it out. >> >> I don't know how to make code look pretty on a mailing list, so here is a >> stack overflow about my issue: >> >> https://stackoverflow.com/questions/46676377/apache-storm- >> kafka-cant-see-sent-kafka-messages-in-storm-ui >> >> I make sure to call storm.supervisor before testing. >> >> I have zookeeper running off port 2181. >> >> I spin up a Kafka broker and use the topic storm-test-topic1. >> >> I fire up a console Kafka producer to send nonsense messages. >> >> Storm.yaml: >> ########### These MUST be filled in for a storm configuration >> storm.zookeeper.servers: >> - "localhost" >> # - "server2" >> # >> nimbus.seeds: ["localhost"] >> # >> # >> >> ------------------------------------------------------------ >> ---------------------------------- >> Topology: >> >> package com.kafka.storm; >> >> import java.util.HashMap; >> >> import org.apache.log4j.Logger; >> import org.apache.storm.Config; >> import org.apache.storm.LocalCluster; >> import org.apache.storm.StormSubmitter; >> import org.apache.storm.generated.AlreadyAliveException; >> import org.apache.storm.generated.AuthorizationException; >> import org.apache.storm.generated.InvalidTopologyException; >> import org.apache.storm.kafka.BrokerHosts; >> import org.apache.storm.kafka.KafkaSpout; >> import org.apache.storm.kafka.SpoutConfig; >> import org.apache.storm.kafka.StringScheme; >> import org.apache.storm.kafka.ZkHosts; >> import org.apache.storm.spout.SchemeAsMultiScheme; >> import org.apache.storm.topology.TopologyBuilder; >> >> import com.kafka.storm.bolt.LoggerBolt; >> >> public class KafkaStormIntegrationDemo { >> private static final Logger LOG = Logger.getLogger(KafkaStormInt >> egrationDemo.class); >> >> public static void main(String[] args) throws InvalidTopologyException, >> AuthorizationException, AlreadyAliveException { >> >> // Build Spout configuration using input command line parameters >> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181"); >> final String kafkaTopic = "storm-test-topic1"; >> final String zkRoot = ""; >> final String clientId = "storm-consumer"; >> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, >> clientId); >> kafkaConf.startOffsetTime = -2; >> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >> >> // Build topology to consume message from kafka and print them on console >> final TopologyBuilder topologyBuilder = new TopologyBuilder(); >> // Create KafkaSpout instance using Kafka configuration and add it to >> topology >> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1); >> //Route the output of Kafka Spout to Logger bolt to log messages consumed >> from Kafka >> topologyBuilder.setBolt("print-messages", new >> LoggerBolt()).globalGrouping("kafka-spout"); >> // Submit topology to local cluster i.e. embedded storm instance in >> eclipse >> Config conf = new Config(); >> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s >> torm-core-1.1.1.jar"); >> StormSubmitter.submitTopology("kafkaTopology", conf, >> topologyBuilder.createTopology()); >> } >> } >> ------------------------------------------------------------ >> ---------------------------------- >> >> Bolt: >> >> package com.kafka.storm.bolt; >> >> import org.apache.log4j.Logger; >> import org.apache.storm.topology.BasicOutputCollector; >> import org.apache.storm.topology.OutputFieldsDeclarer; >> import org.apache.storm.topology.base.BaseBasicBolt; >> import org.apache.storm.tuple.Fields; >> import org.apache.storm.tuple.Tuple; >> >> public class LoggerBolt extends BaseBasicBolt{ >> private static final long serialVersionUID = 1L; >> private static final Logger LOG = Logger.getLogger(LoggerBolt.class); >> >> public void execute(Tuple input, BasicOutputCollector collector) { >> LOG.info(input.getString(0)); >> } >> >> public void declareOutputFields(OutputFieldsDeclarer declarer) { >> declarer.declare(new Fields("message")); >> } >> } >> >> >> thank you in advance for any help you can give, or for just reading! >> >> >