three node kafka cluster with replication factor of  3 is bad design.

It should always be less than the cluster size.

Please change it to 2 and try again.

-Manoj

On Mon, Jan 26, 2015 at 7:19 AM, Milad Fatenejad <ick...@gmail.com> wrote:

> Hello:
>
> I have a three node kafka cluster with a single topic and a topic
> replication factor of 3. I ran a test where I inserted a few hundred
> messages into kafka. While the topology was reading these messages, I
> killed one of the brokers.
>
> My hope was that the kafka spout would simply use one of the other
> replicas. Instead, I saw the following exceptions and no attempt to retry
> or failover to one of the working kafka replicas...is this the expected
> behavior? Is there a way to make storm kafka read messages reliably in the
> presence of broker failures?
>
> I should say that I checked with kafka and after the node failure, one of
> the other brokers was promoted to leader for each partition, as expected.
> So I don't understand why the kafka spout wouldn't eventually retry using
> the new leader? I also checked with the kafka console consumer and verified
> that all messages are available in kafka.
>
> Thank you
> Milad
>
> Exception stack trace is as follows:
>
> 2015-01-26 14:21:54,451 [Thread-10-event_spout]
> [kafka.consumer.SimpleConsumer] [INFO]> Reconnect due to socket error:
> java.nio.channels.ClosedChannelException
> 2015-01-26 14:21:54,451 [Thread-10-event_spout] [storm.kafka.KafkaUtils]
> [WARN]> Network error when fetching messages:
> java.nio.channels.ClosedChannelException: null
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> ~[stormjar.jar:na]
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
> ~[stormjar.jar:na]
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> ~[stormjar.jar:na]
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
> ~[stormjar.jar:na]
>         at
> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
> ~[stormjar.jar:na]
>         at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
> ~[stormjar.jar:na]
>         at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
> [stormjar.jar:na]
>         at storm.kafka.PartitionManager.next(PartitionManager.java:124)
> [stormjar.jar:na]
>         at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
> [stormjar.jar:na]
>         at
> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
> [storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
> 2015-01-26 14:21:54,452 [Thread-10-event_spout] [storm.kafka.KafkaSpout]
> [WARN]> Fetch failed
> storm.kafka.FailedFetchException: java.nio.channels.ClosedChannelException
>         at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:175)
> ~[stormjar.jar:na]
>         at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
> ~[stormjar.jar:na]
>         at storm.kafka.PartitionManager.next(PartitionManager.java:124)
> ~[stormjar.jar:na]
>         at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
> ~[stormjar.jar:na]
>         at
> backtype.storm.daemon.executor$fn__4249$fn__4264$fn__4293.invoke(executor.clj:565)
> [storm-core-0.9.3.jar:0.9.3]
>         at backtype.storm.util$async_loop$fn__461.invoke(util.clj:463)
> [storm-core-0.9.3.jar:0.9.3]
>         at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
> Caused by: java.nio.channels.ClosedChannelException: null
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> ~[stormjar.jar:na]
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
> ~[stormjar.jar:na]
>         at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
> ~[stormjar.jar:na]
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> ~[stormjar.jar:na]
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
> ~[stormjar.jar:na]
>         at
> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47)
> ~[stormjar.jar:na]
>         at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:167)
> ~[stormjar.jar:na]
>         ... 7 common frames omitted
>

Reply via email to