I have 10 partitions on a topic of Kafka, but when trying to read the
following documentation
<https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html> Storm
got the error below. I tried it with Storm core and with Trident, the error
is always the same.

int numPartition = 10;

Broker broker = new Broker(Configuration.BOOTSTRAP_SERVERS_CONFIG);

GlobalPartitionInformation info = new GlobalPartitionInformation(topicName);

for (int i = 0; i < numPartition; i++) {

info.addPartition(i, broker);

}

StaticHosts hosts = new StaticHosts(info);

SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topicName, "/brokers",
UUID.randomUUID().toString());

/* http://goo.gl/riljni | http://goo.gl/37ZUuV */

kafkaSpoutConfig.ignoreZkOffsets = true;

kafkaSpoutConfig.startOffsetTime = -1;

kafkaSpoutConfig.scheme = new org.apache.storm.spout.SchemeAsMultiScheme(new
StringScheme());


java.nio.channels.ClosedChannelException at
kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149)
at
kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79)
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) at
org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) at
org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) at
org.apache.storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:35)
at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:83) at
org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604)
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) at
clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)

--
Thomas Cristanis

Reply via email to