I have 10 partitions on a topic of Kafka, but when trying to read the following documentation <https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka.html> Storm got the error below. I tried it with Storm core and with Trident, the error is always the same.
int numPartition = 10; Broker broker = new Broker(Configuration.BOOTSTRAP_SERVERS_CONFIG); GlobalPartitionInformation info = new GlobalPartitionInformation(topicName); for (int i = 0; i < numPartition; i++) { info.addPartition(i, broker); } StaticHosts hosts = new StaticHosts(info); SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topicName, "/brokers", UUID.randomUUID().toString()); /* http://goo.gl/riljni | http://goo.gl/37ZUuV */ kafkaSpoutConfig.ignoreZkOffsets = true; kafkaSpoutConfig.startOffsetTime = -1; kafkaSpoutConfig.scheme = new org.apache.storm.spout.SchemeAsMultiScheme(new StringScheme()); java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:94) at org.apache.storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:35) at org.apache.storm.kafka.KafkaSpout.open(KafkaSpout.java:83) at org.apache.storm.daemon.executor$fn__7990$fn__8005.invoke(executor.clj:604) at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:482) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) -- Thomas Cristanis