I noticed that storm kafka spout reconsume all kafka message after a rolling restart of kafka cluster.
This issue occured only with kafkaSpout consumer and not for my other consumers (ruby based using the kafka consumer API like kafkaSpout) Attached logs of the spout. Do you know what can cause this kind of behavior ?
2017-08-20 12:03:22.867 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka1:9092 (id: 2147483646 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:23.052 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:23.667 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.092 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.094 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.195 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.197 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.299 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.304 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.408 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.409 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.511 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.512 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.643 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.648 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Marking the coordina tor kafka2:9092 (id: 2147483647 rack: null) dead for group storm_kafka_topology 2017-08-20 12:03:24.781 o.a.k.c.c.i.AbstractCoordinator Thread-11-kafkaSpout-executor[4 4] [INFO] Discovered coordinat or kafka2:9092 (id: 2147483647 rack: null) for group storm_kafka_topology. 2017-08-20 12:03:24.787 o.a.s.util Thread-11-kafkaSpout-executor[4 4] [ERROR] Async loop died! org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebala nced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was lo nger than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time mess age processing. You can address this either by increasing the session timeout or by reducing the maximum size of batch es returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer Coordinator.java:578) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer Coordinator.java:519) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra ctCoordinator.java:679) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra ctCoordinator.java:658) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[stormjar.ja r:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[stormjar.ja r:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[stormjar.jar:? ] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.java:426) ~[stormjar.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360 ) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[st ormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[st ormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[st ormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java: 404) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) ~[storm-core-1.1.0.jar :1.1.0] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.0.jar:1.1.0] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2017-08-20 12:03:24.791 o.a.s.d.executor Thread-11-kafkaSpout-executor[4 4] [ERROR] org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebala nced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was lo nger than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time mess age processing. You can address this either by increasing the session timeout or by reducing the maximum size of batch es returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer Coordinator.java:578) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(Consumer Coordinator.java:519) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra ctCoordinator.java:679) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(Abstra ctCoordinator.java:658) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[stormjar.ja r:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[stormjar.ja r:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[stormjar.jar:? ] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete (ConsumerNetworkClient.java:426) ~[stormjar.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360 ) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[st ormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[st ormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[st ormjar.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java: 404) ~[stormjar.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.commitOffsetsForAckedTuples(KafkaSpout.java:384) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:219) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$fn__4976$fn__4991$fn__5022.invoke(executor.clj:644) ~[storm-core-1.1.0.jar :1.1.0] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.0.jar:1.1.0] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2017-08-20 12:03:24.809 o.a.s.util Thread-11-kafkaSpout-executor[4 4] [ERROR] Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.0.jar:1.1.0] at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?] at org.apache.storm.daemon.worker$fn__5646$fn__5647.invoke(worker.clj:763) [storm-core-1.1.0.jar:1.1.0] at org.apache.storm.daemon.executor$mk_executor_data$fn__4863$fn__4864.invoke(executor.clj:274) [storm-core-1. 1.0.jar:1.1.0] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) [storm-core-1.1.0.jar:1.1.0] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_111] 2017-08-20 12:03:24.811 o.a.s.d.worker Thread-14 [INFO] Shutting down worker storm-kafka-topology-113-1502717137 8c6fca83-6c5a-4e89-af25-9071700b45ae 6707 2017-08-20 12:03:24.811 o.a.s.m.n.Client Thread-14 [INFO] closing Netty Client Netty-Client-storm-cluster-aefb/172.16. 247.10:6708 2017-08-20 12:03:24.811 o.a.s.m.n.Client Thread-14 [INFO] waiting up to 600000 ms to send 0 pending messages to Netty- Client-storm-cluster-aefb/172.16.247.10:6708 2017-08-20 12:03:24.812 o.a.s.m.n.Client Thread-14 [INFO] closing Netty Client Netty-Client-storm-cluster-12e9/172.16. 249.242:6709 2017-08-20 12:03:24.812 o.a.s.m.n.Client Thread-14 [INFO] waiting up to 600000 ms to send 0 pending messages to Netty- Client-storm-cluster-12e9/172.16.249.242:6709 2017-08-20 12:03:24.812 o.a.s.d.worker Thread-14 [INFO] Terminating messaging context 2017-08-20 12:03:24.812 o.a.s.d.worker Thread-14 [INFO] Shutting down executors 2017-08-20 12:03:24.812 o.a.s.d.executor Thread-14 [INFO] Shutting down executor s3UploadBolt:[7 7] 2017-08-20 12:03:24.813 o.a.s.util Thread-5-s3UploadBolt-executor[7 7] [INFO] Async loop interrupted! 2017-08-20 12:03:24.813 o.a.s.util Thread-4-disruptor-executor[7 7]-send-queue [INFO] Async loop interrupted!