just one quick update: fixed storm supervisor- that was something I did while messing with ports.
the workers are still not being put to work however. On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <ryan.bli...@gmail.com> wrote: > I found this :https://issues.apache.org/jira/browse/STORM-1492 > > "With the default value for nimbus.seeds (["localhost"]) Storm UI may > list one "Offline" nimbus for localhost, and another as "Leader" for the > resolved machine name. > A workaround is to modify storm.yaml and replace "localhost" with the > hostname of the machine in nimbus.seeds." > > However, when I drop in my hostname, I am no longer able to spin up > workers! storm supervisor does nothing now. > > > > On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ryan.bli...@gmail.com> > wrote: > >> Yes. Thank you for replying! I've been fussing over it some more and I >> think I'm getting closer to the issue. >> >> In fact, the logs do give a clue- my workers start in state "EMPTY >> -assignment null," do nothing, then get removed after not being used. >> The work isn't even hitting the workers. >> >> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as >> the leader, and localhost as offline. >> >> So, somehow, I must have my nimbus and workers running somewhere >> completely different from the Kafka cluster, which are running on localhost. >> >> I am currently futzing with port numbers in storm.yaml. >> >> How can I bring localhost online as the leader? >> >> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <s...@apache.org> >> wrote: >> >>> Hi Ryan, >>> >>> I don't see anything obviously wrong with your configuration. It's >>> likely your topology logs can tell you what's going wrong. Next time you >>> start your topology make note of the topology name in Storm UI. Also click >>> in to your spout in Storm UI and note which worker port(s) it's running on >>> (if you're running on a multi-node cluster you'll also need to note which >>> machine is running the spout). You should then be able to go to >>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log >>> on the relevant worker and see what the spout worker is logging. >>> >>> In case you don't find anything interesting there, you might also look >>> at logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on >>> the machine running the supervisor for those logs. >>> >>> Also just to make sure, you're running "storm supervisor" as well as >>> "storm nimbus", right? Otherwise your topology won't be assigned to a >>> worker. >>> >>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ryan.bli...@gmail.com>: >>> >>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got >>>> a simple topology working in Local mode- It reads the messages from a Kafka >>>> topic and sends them to a bolt that logs them. However, when I try to >>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples >>>> emitted from the KafkaSpout. >>>> >>>> I've done several laps around the internet at this point, built and >>>> tried different starter projects, and each has the same issue. I can submit >>>> the Topology, but it won't actually work. >>>> >>>> Similar problems to mine seem to come from the Storm /lib and >>>> incompatible .jar files within. I haven't found anything like that in my >>>> case. However, I'm not 100% sure what I should be looking for so I can't >>>> rule it out. >>>> >>>> I don't know how to make code look pretty on a mailing list, so here is >>>> a stack overflow about my issue: >>>> >>>> https://stackoverflow.com/questions/46676377/apache-storm-ka >>>> fka-cant-see-sent-kafka-messages-in-storm-ui >>>> >>>> I make sure to call storm.supervisor before testing. >>>> >>>> I have zookeeper running off port 2181. >>>> >>>> I spin up a Kafka broker and use the topic storm-test-topic1. >>>> >>>> I fire up a console Kafka producer to send nonsense messages. >>>> >>>> Storm.yaml: >>>> ########### These MUST be filled in for a storm configuration >>>> storm.zookeeper.servers: >>>> - "localhost" >>>> # - "server2" >>>> # >>>> nimbus.seeds: ["localhost"] >>>> # >>>> # >>>> >>>> ------------------------------------------------------------ >>>> ---------------------------------- >>>> Topology: >>>> >>>> package com.kafka.storm; >>>> >>>> import java.util.HashMap; >>>> >>>> import org.apache.log4j.Logger; >>>> import org.apache.storm.Config; >>>> import org.apache.storm.LocalCluster; >>>> import org.apache.storm.StormSubmitter; >>>> import org.apache.storm.generated.AlreadyAliveException; >>>> import org.apache.storm.generated.AuthorizationException; >>>> import org.apache.storm.generated.InvalidTopologyException; >>>> import org.apache.storm.kafka.BrokerHosts; >>>> import org.apache.storm.kafka.KafkaSpout; >>>> import org.apache.storm.kafka.SpoutConfig; >>>> import org.apache.storm.kafka.StringScheme; >>>> import org.apache.storm.kafka.ZkHosts; >>>> import org.apache.storm.spout.SchemeAsMultiScheme; >>>> import org.apache.storm.topology.TopologyBuilder; >>>> >>>> import com.kafka.storm.bolt.LoggerBolt; >>>> >>>> public class KafkaStormIntegrationDemo { >>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt >>>> egrationDemo.class); >>>> >>>> public static void main(String[] args) throws InvalidTopologyException, >>>> AuthorizationException, AlreadyAliveException { >>>> >>>> // Build Spout configuration using input command line parameters >>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181"); >>>> final String kafkaTopic = "storm-test-topic1"; >>>> final String zkRoot = ""; >>>> final String clientId = "storm-consumer"; >>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, >>>> clientId); >>>> kafkaConf.startOffsetTime = -2; >>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>> >>>> // Build topology to consume message from kafka and print them on >>>> console >>>> final TopologyBuilder topologyBuilder = new TopologyBuilder(); >>>> // Create KafkaSpout instance using Kafka configuration and add it to >>>> topology >>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1); >>>> //Route the output of Kafka Spout to Logger bolt to log messages >>>> consumed from Kafka >>>> topologyBuilder.setBolt("print-messages", new >>>> LoggerBolt()).globalGrouping("kafka-spout"); >>>> // Submit topology to local cluster i.e. embedded storm instance in >>>> eclipse >>>> Config conf = new Config(); >>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s >>>> torm-core-1.1.1.jar"); >>>> StormSubmitter.submitTopology("kafkaTopology", conf, >>>> topologyBuilder.createTopology()); >>>> } >>>> } >>>> ------------------------------------------------------------ >>>> ---------------------------------- >>>> >>>> Bolt: >>>> >>>> package com.kafka.storm.bolt; >>>> >>>> import org.apache.log4j.Logger; >>>> import org.apache.storm.topology.BasicOutputCollector; >>>> import org.apache.storm.topology.OutputFieldsDeclarer; >>>> import org.apache.storm.topology.base.BaseBasicBolt; >>>> import org.apache.storm.tuple.Fields; >>>> import org.apache.storm.tuple.Tuple; >>>> >>>> public class LoggerBolt extends BaseBasicBolt{ >>>> private static final long serialVersionUID = 1L; >>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class); >>>> >>>> public void execute(Tuple input, BasicOutputCollector collector) { >>>> LOG.info(input.getString(0)); >>>> } >>>> >>>> public void declareOutputFields(OutputFieldsDeclarer declarer) { >>>> declarer.declare(new Fields("message")); >>>> } >>>> } >>>> >>>> >>>> thank you in advance for any help you can give, or for just reading! >>>> >>>> >>> >> >