just one quick update: fixed storm supervisor- that was something I did
while messing with ports.

the workers are still not being put to work however.

On Wed, Oct 11, 2017 at 3:58 PM, Ryan Bliton <ryan.bli...@gmail.com> wrote:

> I found this :https://issues.apache.org/jira/browse/STORM-1492
>
> "With the default value for nimbus.seeds (["localhost"]) Storm UI may
> list one "Offline" nimbus for localhost, and another as "Leader" for the
> resolved machine name.
>  A workaround is to modify storm.yaml and replace "localhost" with the
> hostname of the machine in nimbus.seeds."
>
> However, when I drop in my hostname, I am no longer able to spin up
> workers! storm supervisor does nothing now.
>
>
>
> On Wed, Oct 11, 2017 at 3:42 PM, Ryan Bliton <ryan.bli...@gmail.com>
> wrote:
>
>> Yes. Thank you for replying! I've been fussing over it some more and I
>> think I'm getting closer to the issue.
>>
>> In fact, the logs do give a clue- my workers start in state "EMPTY
>> -assignment null," do nothing, then get removed after not being used.
>> The work isn't even hitting the workers.
>>
>> in my Storm UI, it lists my PC name (ABCD-PC123453.my.company.name) as
>> the leader, and localhost as offline.
>>
>> So, somehow, I must have my nimbus and workers running somewhere
>> completely different from the Kafka cluster, which are running on localhost.
>>
>> I am currently futzing with port numbers in storm.yaml.
>>
>> How can I bring localhost online as the leader?
>>
>> On Wed, Oct 11, 2017 at 2:58 PM, Stig Rohde Døssing <s...@apache.org>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> I don't see anything obviously wrong with your configuration. It's
>>> likely your topology logs can tell you what's going wrong. Next time you
>>> start your topology make note of the topology name in Storm UI. Also click
>>> in to your spout in Storm UI and note which worker port(s) it's running on
>>> (if you're running on a multi-node cluster you'll also need to note which
>>> machine is running the spout). You should then be able to go to
>>> $storm-install-dir/logs/workers-artifacts/$your-topology-name-here/$worker-port/worker.log
>>> on the relevant worker and see what the spout worker is logging.
>>>
>>> In case you don't find anything interesting there, you might also look
>>> at logs/nimbus.log on the machine running Nimbus and logs/supervisor.log on
>>> the machine running the supervisor for those logs.
>>>
>>> Also just to make sure, you're running "storm supervisor" as well as
>>> "storm nimbus", right? Otherwise your topology won't be assigned to a
>>> worker.
>>>
>>> 2017-10-11 16:53 GMT+02:00 Ryan Bliton <ryan.bli...@gmail.com>:
>>>
>>>> Hi! I'm trying to get a starter Kafka-Storm integration going. I've got
>>>> a simple topology working in Local mode- It reads the messages from a Kafka
>>>> topic and sends them to a bolt that logs them. However, when I try to
>>>> submit the Topology to a cluster, the Storm UI always reads 0 tuples
>>>> emitted from the KafkaSpout.
>>>>
>>>> I've done several laps around the internet at this point, built and
>>>> tried different starter projects, and each has the same issue. I can submit
>>>> the Topology, but it won't actually work.
>>>>
>>>> Similar problems to mine seem to come from the Storm /lib and
>>>> incompatible .jar files within. I haven't found anything like that in my
>>>> case. However, I'm not 100% sure what I should be looking for so I can't
>>>> rule it out.
>>>>
>>>> I don't know how to make code look pretty on a mailing list, so here is
>>>> a stack overflow about my issue:
>>>>
>>>> https://stackoverflow.com/questions/46676377/apache-storm-ka
>>>> fka-cant-see-sent-kafka-messages-in-storm-ui
>>>>
>>>> I make sure to call storm.supervisor before testing.
>>>>
>>>> I have zookeeper running off port 2181.
>>>>
>>>> I spin up a Kafka broker and use the topic storm-test-topic1.
>>>>
>>>> I fire up a console Kafka producer to send nonsense messages.
>>>>
>>>> Storm.yaml:
>>>> ########### These MUST be filled in for a storm configuration
>>>>  storm.zookeeper.servers:
>>>>      - "localhost"
>>>> #     - "server2"
>>>> #
>>>>  nimbus.seeds: ["localhost"]
>>>> #
>>>> #
>>>>
>>>> ------------------------------------------------------------
>>>> ----------------------------------
>>>> Topology:
>>>>
>>>> package com.kafka.storm;
>>>>
>>>> import java.util.HashMap;
>>>>
>>>> import org.apache.log4j.Logger;
>>>> import org.apache.storm.Config;
>>>> import org.apache.storm.LocalCluster;
>>>> import org.apache.storm.StormSubmitter;
>>>> import org.apache.storm.generated.AlreadyAliveException;
>>>> import org.apache.storm.generated.AuthorizationException;
>>>> import org.apache.storm.generated.InvalidTopologyException;
>>>> import org.apache.storm.kafka.BrokerHosts;
>>>> import org.apache.storm.kafka.KafkaSpout;
>>>> import org.apache.storm.kafka.SpoutConfig;
>>>> import org.apache.storm.kafka.StringScheme;
>>>> import org.apache.storm.kafka.ZkHosts;
>>>> import org.apache.storm.spout.SchemeAsMultiScheme;
>>>> import org.apache.storm.topology.TopologyBuilder;
>>>>
>>>> import com.kafka.storm.bolt.LoggerBolt;
>>>>
>>>> public class KafkaStormIntegrationDemo {
>>>> private static final Logger LOG = Logger.getLogger(KafkaStormInt
>>>> egrationDemo.class);
>>>>
>>>> public static void main(String[] args) throws InvalidTopologyException,
>>>> AuthorizationException, AlreadyAliveException {
>>>>
>>>> // Build Spout configuration using input command line parameters
>>>> final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
>>>> final String kafkaTopic = "storm-test-topic1";
>>>> final String zkRoot = "";
>>>> final String clientId = "storm-consumer";
>>>> SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
>>>> clientId);
>>>> kafkaConf.startOffsetTime = -2;
>>>> kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>>>>
>>>> // Build topology to consume message from kafka and print them on
>>>> console
>>>> final TopologyBuilder topologyBuilder = new TopologyBuilder();
>>>> // Create KafkaSpout instance using Kafka configuration and add it to
>>>> topology
>>>> topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
>>>> //Route the output of Kafka Spout to Logger bolt to log messages
>>>> consumed from Kafka
>>>> topologyBuilder.setBolt("print-messages", new
>>>> LoggerBolt()).globalGrouping("kafka-spout");
>>>> // Submit topology to local cluster i.e. embedded storm instance in
>>>> eclipse
>>>> Config conf = new Config();
>>>> System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/s
>>>> torm-core-1.1.1.jar");
>>>> StormSubmitter.submitTopology("kafkaTopology", conf,
>>>> topologyBuilder.createTopology());
>>>> }
>>>> }
>>>> ------------------------------------------------------------
>>>> ----------------------------------
>>>>
>>>> Bolt:
>>>>
>>>> package com.kafka.storm.bolt;
>>>>
>>>> import org.apache.log4j.Logger;
>>>> import org.apache.storm.topology.BasicOutputCollector;
>>>> import org.apache.storm.topology.OutputFieldsDeclarer;
>>>> import org.apache.storm.topology.base.BaseBasicBolt;
>>>> import org.apache.storm.tuple.Fields;
>>>> import org.apache.storm.tuple.Tuple;
>>>>
>>>> public class LoggerBolt extends BaseBasicBolt{
>>>> private static final long serialVersionUID = 1L;
>>>> private static final Logger LOG = Logger.getLogger(LoggerBolt.class);
>>>>
>>>> public void execute(Tuple input, BasicOutputCollector collector) {
>>>> LOG.info(input.getString(0));
>>>> }
>>>>
>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> declarer.declare(new Fields("message"));
>>>> }
>>>> }
>>>>
>>>>
>>>> thank you in advance for any help you can give, or for just reading!
>>>>
>>>>
>>>
>>
>

Reply via email to