three node kafka cluster with replication factor of 3 is bad design. It should always be less than the cluster size.
Please change it to 2 and try again. -Manoj On Mon, Jan 26, 2015 at 7:19 AM, Milad Fatenejad <ick...@gmail.com> wrote: > Hello: > > I have a three node kafka cluster with a single topic and a topic > replication factor of 3. I ran a test where I inserted a few hundred > messages into kafka. While the topology was reading these messages, I > killed one of the brokers. > > My hope was that the kafka spout would simply use one of the other > replicas. Instead, I saw the following exceptions and no attempt to retry > or failover to one of the working kafka replicas...is this the expected > behavior? Is there a way to make storm kafka read messages reliably in the > presence of broker failures? > > I should say that I checked with kafka and after the node failure, one of > the other brokers was promoted to leader for each partition, as expected. > So I don't understand why the kafka spout wouldn't eventually retry using > the new leader? I also checked with the kafka console consumer and verified > that all messages are available in kafka. > > Thank you > Milad > > Exception stack trace is as follows: > > 2015-01-26 14:21:54,451 [Thread-10-event_spout] > [kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket error: > java.nio.channels.ClosedChannelException > 2015-01-26 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils] > [WARN]> Network error when fetching messages: > java.nio.channels.ClosedChannelException: null > at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) > ~[stormjar.jar:na] > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) > ~[stormjar.jar:na] > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > ~[stormjar.jar:na] > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) > ~[stormjar.jar:na] > at > kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) > ~[stormjar.jar:na] > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167) > ~[stormjar.jar:na] > at storm.kafka.PartitionManager.fill(PartitionManager.java:162) > [stormjar.jar:na] > at storm.kafka.PartitionManager.next(PartitionManager.java:124) > [stormjar.jar:na] > at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) > [stormjar.jar:na] > at > backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565) > [storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] > 2015-01-26 14:21:54,452 [Thread-10-event_spout] [storm.kafka.KafkaSpout] > [WARN]> Fetch failed > storm.kafka.FailedFetchException: java.nio.channels.ClosedChannelException > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175) > ~[stormjar.jar:na] > at storm.kafka.PartitionManager.fill(PartitionManager.java:162) > ~[stormjar.jar:na] > at storm.kafka.PartitionManager.next(PartitionManager.java:124) > ~[stormjar.jar:na] > at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) > ~[stormjar.jar:na] > at > backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565) > [storm-core-0.9.3.jar:0.9.3] > at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463) > [storm-core-0.9.3.jar:0.9.3] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] > Caused by: java.nio.channels.ClosedChannelException: null > at kafka.network.BlockingChannel.send(BlockingChannel.scala:97) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113) > ~[stormjar.jar:na] > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) > ~[stormjar.jar:na] > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112) > ~[stormjar.jar:na] > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > ~[stormjar.jar:na] > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111) > ~[stormjar.jar:na] > at > kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) > ~[stormjar.jar:na] > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167) > ~[stormjar.jar:na] > ... 7 common frames omitted >