Warning, long message *Problem*: Initializing a Kafka Stream is taking a loooong time. Currently at the 40 minute mark
*Setup*: 2 co-partition topics with 100 partitions. First topic contains a lot of messages in the order of hundreds of millions Second topic is a KTable and contains ~30k records Kafka cluster with 6 brokers running 0.10.1 Kafka streams running on 0.10.2.1. 5 instances with 5 threads each. The instances are running on Kubernetes *Stream Configuration*: Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); *The events*: I started 5 instances of my stream configuration at the same time. This is the first time this configuration is running. 2017-05-05 21:23:03.283 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Creating producer client 2017-05-05 21:23:03.415 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Creating consumer client 2017-05-05 21:23:03.520 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Creating restore consumer client 2017-05-05 21:23:03.528 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] State transition from NOT_RUNNING to RUNNING. 2017-05-05 21:23:03.531 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Creating producer client 2017-05-05 21:23:03.564 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Creating consumer client 2017-05-05 21:23:03.569 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Creating restore consumer client 2017-05-05 21:23:03.615 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] State transition from NOT_RUNNING to RUNNING. 2017-05-05 21:23:03.617 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Creating producer client 2017-05-05 21:23:03.621 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Creating consumer client 2017-05-05 21:23:03.625 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Creating restore consumer client 2017-05-05 21:23:03.628 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] State transition from NOT_RUNNING to RUNNING. 2017-05-05 21:23:03.629 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Creating producer client 2017-05-05 21:23:03.632 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Creating consumer client 2017-05-05 21:23:03.635 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Creating restore consumer client 2017-05-05 21:23:03.638 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] State transition from NOT_RUNNING to RUNNING. 2017-05-05 21:23:03.639 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Creating producer client 2017-05-05 21:23:03.641 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Creating consumer client 2017-05-05 21:23:03.644 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Creating restore consumer client 2017-05-05 21:23:03.647 INFO 71 --- [ main] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] State transition from NOT_RUNNING to RUNNING. 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Starting 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Starting 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Starting 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Starting 2017-05-05 21:23:03.792 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Starting 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] State transition from RUNNING to PARTITIONS_REVOKED. 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] at state RUNNING: partitions [] revoked at the beginning of consumer rebalance. 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] State transition from RUNNING to PARTITIONS_REVOKED. 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] State transition from RUNNING to PARTITIONS_REVOKED. 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] State transition from RUNNING to PARTITIONS_REVOKED. 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Updating suspended tasks to contain active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Updating suspended tasks to contain active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Updating suspended tasks to contain active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Updating suspended tasks to contain active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Removing all active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Removing all active tasks [] 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Removing all active tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Removing all active tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Removing all active tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] Removing all standby tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Removing all standby tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] Removing all standby tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Removing all standby tasks [] 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] Removing all standby tasks [] 2017-05-05 21:23:04.020 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Constructed client metadata {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member subscriptions. 2017-05-05 21:23:04.218 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Completed validating internal topics in partition assignor 2017-05-05 21:23:04.591 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Completed validating internal topics in partition assignor 2017-05-05 21:23:04.726 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Assigned tasks to clients as {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 100.0]}. 2017-05-05 21:23:04.742 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Constructed client metadata {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 0.0]}} from the member subscriptions. 2017-05-05 21:23:05.120 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Completed validating internal topics in partition assignor 2017-05-05 21:23:05.482 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Completed validating internal topics in partition assignor 2017-05-05 21:23:05.520 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Assigned tasks to clients as {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 50.0], da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 50.0]}. 2017-05-05 21:23:05.553 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned at the end of consumer rebalance. *// The above line is repeated for each thread* 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-5] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-5] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-3] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-3] State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. *// omitted* 2017-05-05 21:23:15.596 INFO 71 --- [ StreamThread-2] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-2] State transition from ASSIGNING_PARTITIONS to RUNNING. *// above message repeated for each thread* *Important*: At this point only StreamThread-4 is performing commits every 10 seconds. The other threads output no logs. Now the fun begins 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] State transition from RUNNING to PARTITIONS_REVOKED. 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Closing task's topology ... // repeated multiple times 2017-05-05 21:29:21.387 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Flushing state stores of task ... // repeated multiple times 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Committing consumer offsets of task ... // repeated multiple times 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Updating suspended tasks to contain active tasks [<list>] 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Removing all active tasks [<list>] 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-4] Removing all standby tasks [] At this point there are no more log messages for 16 minutes!! During this time I perform several threaddumps, almost every minute. Thread dump below. Do notice that thread 4 is the only different one. "StreamThread-5" - Thread t@57 java.lang.Thread.State: BLOCKED at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - waiting to lock <653f9d6> (a org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <3fbbc5a8> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-4" - Thread t@55 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <5709c085> (a sun.nio.ch.Util$2) - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <639d53ee> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-3" - Thread t@53 java.lang.Thread.State: RUNNABLE at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <7a09baf0> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-2" - Thread t@51 java.lang.Thread.State: BLOCKED at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - waiting to lock <653f9d6> (a org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <2dc188ac> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-1" - Thread t@49 java.lang.Thread.State: RUNNABLE at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:172) - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <7dffc3aa> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None (no logs omitted, ~16 minutes later) 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 10000ms has elapsed 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask .. // repeated multiple times 2017-05-05 21:45:05.379 INFO 71 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED. *// The above is repeated for threads 2, 3 and 5* (no logs omitted!! This is really the next entry, ~10 minutes later) 2017-05-05 21:55:16.835 INFO 71 --- [ StreamThread-4] o.a.k.s.p.i.StreamPartitionAssignor : stream-thread [StreamThread-4] Constructed client metadata ... During the above 10 minutes all threads show the following "StreamThread-5" - Thread t@57 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <4a60e0dd> (a sun.nio.ch.Util$2) - locked <5e54059f> (a java.util.Collections$UnmodifiableSet) - locked <60693986> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <5f1ae6c1> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-4" - Thread t@55 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <5709c085> (a sun.nio.ch.Util$2) - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <639d53ee> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-3" - Thread t@53 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <123193f7> (a sun.nio.ch.Util$2) - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet) - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <4d9f6f42> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-2" - Thread t@51 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <532ff32d> (a sun.nio.ch.Util$2) - locked <76a6407> (a java.util.Collections$UnmodifiableSet) - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <29b48d84> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-1" - Thread t@49 java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <5aeb504> (a sun.nio.ch.Util$2) - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet) - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <7b072bc6> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None Eventually they get assigned partitions again, then they are revoked, another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we get into the same situation. Finally, I tried starting up only 1 instance (with 5 threads). Current status: "StreamThread-5" - Thread t@56 java.lang.Thread.State: RUNNABLE at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <5bea9b1b> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-4" - Thread t@54 java.lang.Thread.State: RUNNABLE at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <5a06855> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-3" - Thread t@52 java.lang.Thread.State: BLOCKED at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - waiting to lock <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <d3a57bc> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-2" - Thread t@50 java.lang.Thread.State: RUNNABLE at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:175) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <480f4efc> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None "StreamThread-1" - Thread t@48 java.lang.Thread.State: BLOCKED at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169) - waiting to lock <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176) at org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190) at org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139) at org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) - locked <47b226a5> (a java.util.PriorityQueue) at org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251) at org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) Locked ownable synchronizers: - None This has been going on for over 40 minutes now and the cluster does not stabilize. Not sure what to do here, any help welcome.