Hi! I'm trying to get a starter Kafka-Storm integration going. I've got a
simple topology working in Local mode- It reads the messages from a Kafka
topic and sends them to a bolt that logs them. However, when I try to
submit the Topology to a cluster, the Storm UI always reads 0 tuples
emitted from the KafkaSpout.

I've done several laps around the internet at this point, built and tried
different starter projects, and each has the same issue. I can submit the
Topology, but it won't actually work.

Similar problems to mine seem to come from the Storm /lib and incompatible
.jar files within. I haven't found anything like that in my case. However,
I'm not 100% sure what I should be looking for so I can't rule it out.

I don't know how to make code look pretty on a mailing list, so here is a
stack overflow about my issue:

https://stackoverflow.com/questions/46676377/apache-storm-kafka-cant-see-sent-kafka-messages-in-storm-ui

I make sure to call storm.supervisor before testing.

I have zookeeper running off port 2181.

I spin up a Kafka broker and use the topic storm-test-topic1.

I fire up a console Kafka producer to send nonsense messages.

Storm.yaml:
########### These MUST be filled in for a storm configuration
 storm.zookeeper.servers:
     - "localhost"
#     - "server2"
#
 nimbus.seeds: ["localhost"]
#
#

----------------------------------------------------------------------------------------------
Topology:

package com.kafka.storm;

import java.util.HashMap;

import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;

import com.kafka.storm.bolt.LoggerBolt;

public class KafkaStormIntegrationDemo {
private static final Logger LOG =
Logger.getLogger(KafkaStormIntegrationDemo.class);

public static void main(String[] args) throws InvalidTopologyException,
AuthorizationException, AlreadyAliveException {

// Build Spout configuration using input command line parameters
final BrokerHosts zkrHosts = new ZkHosts("localhost:2181");
final String kafkaTopic = "storm-test-topic1";
final String zkRoot = "";
final String clientId = "storm-consumer";
SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot,
clientId);
kafkaConf.startOffsetTime = -2;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());

// Build topology to consume message from kafka and print them on console
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// Create KafkaSpout instance using Kafka configuration and add it to
topology
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
//Route the output of Kafka Spout to Logger bolt to log messages consumed
from Kafka
topologyBuilder.setBolt("print-messages", new
LoggerBolt()).globalGrouping("kafka-spout");
// Submit topology to local cluster i.e. embedded storm instance in eclipse
Config conf = new Config();
System.setProperty("storm.jar","C://apache-storm-1.1.1/lib/storm-core-1.1.1.jar");
StormSubmitter.submitTopology("kafkaTopology", conf,
topologyBuilder.createTopology());
}
}
----------------------------------------------------------------------------------------------

Bolt:

package com.kafka.storm.bolt;

import org.apache.log4j.Logger;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class LoggerBolt extends BaseBasicBolt{
private static final long serialVersionUID = 1L;
private static final Logger LOG = Logger.getLogger(LoggerBolt.class);

public void execute(Tuple input, BasicOutputCollector collector) {
LOG.info(input.getString(0));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}


thank you in advance for any help you can give, or for just reading!

Reply via email to