Yes. Thank you for replying! I've been fussing over it some more and I
think I'm getting closer to the issue.

In fact, the logs do give a clue- my workers start in state "EMPTY
-assignment null," do nothing, then get removed after not being used.
The work isn't even hitting the workers.

in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as the
leader, and localhost as offline.

So, somehow, I must have my nimbus and workers running somewhere completely
different from the Kafka cluster, which are running on localhost.

I am currently futzing with port numbers in storm.yaml.

How can I bring localhost online as the leader?

On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <s...@apache.org> wrote:

> Hi Ryan,
>
> I don't see anything obviously wrong with your configuration. It's likely
> your topology logs can tell you what's going wrong. Next time you start
> your topology make note of the topology name in Storm UI. Also click in to
> your spout in Storm UI and note which worker port(s) it's running on (if
> you're running on a multi-node cluster you'll also need to note which
> machine is running the spout). You should then be able to go to
> $storm-install-dir/logs/workers-artifacts/$your-
> topology-name-here/$worker-port/worker.log on the relevant worker and see
> what the spout worker is logging.
>
> In case you don't find anything interesting there, you might also look at
> logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
> the machine running the supervisor for those logs.
>
> Also just to make sure, you're running "storm supervisor" as well as
> "storm nimbus", right? Otherwise your topology won't be assigned to a
> worker.
>
> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ryan.bli...@gmail.com>:
>
>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a
>> simple topology working in Local mode- It reads the messages from a Kafka
>> topic and sends them to a bolt that logs them. However, when I try to
>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>> emitted from the KafkaSpout.
>>
>> I've done several laps around the internet at this point, built and tried
>> different starter projects, and each has the same issue. I can submit the
>> Topology, but it won't actually work.
>>
>> Similar problems to mine seem to come from the Storm /lib and
>> incompatible .jar files within. I haven't found anything like that in my
>> case. However, I'm not 100% sure what I should be looking for so I can't
>> rule it out.
>>
>> I don't know how to make code look pretty on a mailing list, so here is a
>> stack overflow about my issue:
>>
>> https://stackoverflow.com/questions/46676377/apache-storm-
>> kafka-cant-see-sent-kafka-messages-in-storm-ui
>>
>> I make sure to call storm.supervisor before testing.
>>
>> I have zookeeper running off port 2181.
>>
>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>
>> I fire up a console Kafka producer to send nonsense messages.
>>
>> Storm.yaml:
>> ########### These MUST be filled in for a storm configuration
>>  storm.zookeeper.servers:
>>      - "localhost"
>> #     - "server2"
>> #
>>  nimbus.seeds: ["localhost"]
>> #
>> #
>>
>> ------------------------------------------------------------
>> ----------------------------------
>> Topology:
>>
>> package com.kafka.storm;
>>
>> import java.util.HashMap;
>>
>> import org.apache.log4j.Logger;
>> import org.apache.storm.Config;
>> import org.apache.storm.LocalCluster;
>> import org.apache.storm.StormSubmitter;
>> import org.apache.storm.generated.AlreadyAliveException;
>> import org.apache.storm.generated.AuthorizationException;
>> import org.apache.storm.generated.InvalidTopologyException;
>> import org.apache.storm.kafka.BrokerHosts;
>> import org.apache.storm.kafka.KafkaSpout;
>> import org.apache.storm.kafka.SpoutConfig;
>> import org.apache.storm.kafka.StringScheme;
>> import org.apache.storm.kafka.ZkHosts;
>> import org.apache.storm.spout.SchemeAsMultiScheme;
>> import org.apache.storm.topology.TopologyBuilder;
>>
>> import com.kafka.storm.bolt.LoggerBolt;
>>
>> public class KafkaStormIntegrationDemo {
>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>> egrationDemo.class);
>>
>> public static void main(String[] args) throws InvalidTopologyException,
>> AuthorizationException, AlreadyAliveException {
>>
>> // Build Spout configuration using input command line parameters
>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>> final String kafkaTopic = "storm-test-topic1";
>> final String zkRoot = "";
>> final String clientId = "storm-consumer";
>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>> clientId);
>> kafkaConf.startOffsetTime = -2;
>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>
>> // Build topology to consume message from kafka and print them on console
>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>> // Create KafkaSpout instance using Kafka configuration and add it to
>> topology
>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>> //Route the output of Kafka Spout to Logger bolt to log messages consumed
>> from Kafka
>> topologyBuilder.setBolt("print-messages", new
>> LoggerBolt()).globalGrouping("kafka-spout");
>> // Submit topology to local cluster i.e. embedded storm instance in
>> eclipse
>> Config conf = new Config();
>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>> torm-core-1.1.1.jar");
>> StormSubmitter.submitTopology("kafkaTopology", conf,
>> topologyBuilder.createTopology());
>> }
>> }
>> ------------------------------------------------------------
>> ----------------------------------
>>
>> Bolt:
>>
>> package com.kafka.storm.bolt;
>>
>> import org.apache.log4j.Logger;
>> import org.apache.storm.topology.BasicOutputCollector;
>> import org.apache.storm.topology.OutputFieldsDeclarer;
>> import org.apache.storm.topology.base.BaseBasicBolt;
>> import org.apache.storm.tuple.Fields;
>> import org.apache.storm.tuple.Tuple;
>>
>> public class LoggerBolt extends BaseBasicBolt{
>> private static final long serialVersionUID = 1L;
>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>
>> public void execute(Tuple input, BasicOutputCollector collector) {
>> LOG.info(input.getString(0));
>> }
>>
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> declarer.declare(new Fields("message"));
>> }
>> }
>>
>>
>> thank you in advance for any help you can give, or for just reading!
>>
>>
>

Reply via email to