Hi there, I wanted to add something: how many CPU cores does each of your Kubernetes instance have? In 0.10.2.1 we noticed a regression in environments with 1 core as described in https://issues.apache.org/jira/browse/KAFKA-5174 <https://issues.apache.org/jira/browse/KAFKA-5174>.
If you have 1 core, the workaround is to change a config as described here: http://docs.confluent.io/current/streams/upgrade-guide.html#known-issues-and-workarounds <http://docs.confluent.io/current/streams/upgrade-guide.html#known-issues-and-workarounds> Thanks Eno > On May 6, 2017, at 9:48 AM, Sachin Mittal <sjmit...@gmail.com> wrote: > > Note on few things. > Set changelog topic delete retention time to as less as possible if the > previous values for same key are not needed and can be safely cleaned up. > Set segment size and segment retention time also low so older segments can > be compacted and cleaned up. > Set delete ratio to be aggressive 0.01 so segments don't grow to big. > > This way state stores would be created much faster. > > Also when using Windows smaller window size helps. > > Try not running many stream threads on single machine unless you have a > great hardware. > > Make sure a thread is not reading from many partitions. Make sure ratio of > partions to total threads is low. > > Hope this helps. > > Sachin > > On 6 May 2017 13:28, "Shimi Kiviti" <shim...@gmail.com> wrote: > >> This is very similar to issues that we see. >> >> Did you check the status of the consumer group? In my case it will be in >> rebalancing most of the time. Once in a while it will show consumers and >> offsets but after a short time will go back to rebalancing. >> >> How much storage does your Kafka-streams use? >> Also, what is your k8s configuration? >> Deployment? Deployment with emptyDir, hostPath or EBS? Statefulset? >> >> Thanks, >> Shimi >> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.harti...@gmail.com> wrote: >> >>> After a while the instance started running. >>> >>> 2017-05-05 22:40:26.806 INFO 85 --- [ StreamThread-4] >>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>> Committing task StreamTask 1_62 >>> (this is literally the next message) >>> 2017-05-05 23:13:27.820 INFO 85 --- [ StreamThread-4] >>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>> Committing all tasks because the commit interval 10000ms has elapsed >>> >>> On Fri, May 5, 2017 at 3:48 PM João Peixoto <joao.harti...@gmail.com> >>> wrote: >>> >>>> Warning, long message >>>> >>>> *Problem*: Initializing a Kafka Stream is taking a loooong time. >>>> Currently at the 40 minute mark >>>> >>>> *Setup*: >>>> 2 co-partition topics with 100 partitions. >>>> First topic contains a lot of messages in the order of hundreds of >>> millions >>>> Second topic is a KTable and contains ~30k records >>>> >>>> Kafka cluster with 6 brokers running 0.10.1 >>>> >>>> Kafka streams running on 0.10.2.1. 5 instances with 5 threads each. >>>> The instances are running on Kubernetes >>>> >>>> *Stream Configuration*: >>>> Properties props = new Properties(); >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); >>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...); >>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >>>> Serdes.String().getClass().getName()); >>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >>>> Serdes.ByteArray().getClass().getName()); >>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); >>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); >>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); >>>> >>>> *The events*: >>>> I started 5 instances of my stream configuration at the same time. This >>> is >>>> the first >>>> time this configuration is running. >>>> >>>> 2017-05-05 21:23:03.283 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Creating producer client >>>> 2017-05-05 21:23:03.415 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Creating consumer client >>>> 2017-05-05 21:23:03.520 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Creating restore consumer client >>>> 2017-05-05 21:23:03.528 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> State transition from NOT_RUNNING to RUNNING. >>>> 2017-05-05 21:23:03.531 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Creating producer client >>>> 2017-05-05 21:23:03.564 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Creating consumer client >>>> 2017-05-05 21:23:03.569 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Creating restore consumer client >>>> 2017-05-05 21:23:03.615 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> State transition from NOT_RUNNING to RUNNING. >>>> 2017-05-05 21:23:03.617 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Creating producer client >>>> 2017-05-05 21:23:03.621 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Creating consumer client >>>> 2017-05-05 21:23:03.625 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Creating restore consumer client >>>> 2017-05-05 21:23:03.628 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> State transition from NOT_RUNNING to RUNNING. >>>> 2017-05-05 21:23:03.629 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Creating producer client >>>> 2017-05-05 21:23:03.632 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Creating consumer client >>>> 2017-05-05 21:23:03.635 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Creating restore consumer client >>>> 2017-05-05 21:23:03.638 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> State transition from NOT_RUNNING to RUNNING. >>>> 2017-05-05 21:23:03.639 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Creating producer client >>>> 2017-05-05 21:23:03.641 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Creating consumer client >>>> 2017-05-05 21:23:03.644 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Creating restore consumer client >>>> 2017-05-05 21:23:03.647 INFO 71 --- [ main] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> State transition from NOT_RUNNING to RUNNING. >>>> 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Starting >>>> 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Starting >>>> 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Starting >>>> 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Starting >>>> 2017-05-05 21:23:03.792 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Starting >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> at state RUNNING: partitions [] revoked at the beginning of consumer >>>> rebalance. >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> at state RUNNING: partitions [] revoked at the beginning of consumer >>>> rebalance. >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> at state RUNNING: partitions [] revoked at the beginning of consumer >>>> rebalance. >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> at state RUNNING: partitions [] revoked at the beginning of consumer >>>> rebalance. >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> at state RUNNING: partitions [] revoked at the beginning of consumer >>>> rebalance. >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Updating suspended tasks to contain active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Updating suspended tasks to contain active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Updating suspended tasks to contain active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Updating suspended tasks to contain active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Updating suspended tasks to contain active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Removing all active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Removing all active tasks [] >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Removing all active tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Removing all active tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Removing all active tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> Removing all standby tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Removing all standby tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> Removing all standby tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Removing all standby tasks [] >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> Removing all standby tasks [] >>>> 2017-05-05 21:23:04.020 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Constructed client metadata >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: >> ([]) >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: >> 0.0]}} >>>> from the member subscriptions. >>>> 2017-05-05 21:23:04.218 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Completed validating internal topics in partition assignor >>>> 2017-05-05 21:23:04.591 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Completed validating internal topics in partition assignor >>>> 2017-05-05 21:23:04.726 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Assigned tasks to clients as >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) >>>> assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([]) >>>> capacity: 1.0 cost: 100.0]}. >>>> 2017-05-05 21:23:04.742 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Constructed client metadata >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: >> ([]) >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: >> 0.0]}} >>>> from the member subscriptions. >>>> 2017-05-05 21:23:05.120 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Completed validating internal topics in partition assignor >>>> 2017-05-05 21:23:05.482 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Completed validating internal topics in partition assignor >>>> 2017-05-05 21:23:05.520 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Assigned tasks to clients as >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: >> 50.0], >>>> da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>]) >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: >> 50.0]}. >>>> 2017-05-05 21:23:05.553 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned >> at >>>> the end of consumer rebalance. >>>> *// The above line is repeated for each thread* >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-5] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-5] >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-3] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-3] >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. >>>> *// omitted* >>>> 2017-05-05 21:23:15.596 INFO 71 --- [ StreamThread-2] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-2] >>>> State transition from ASSIGNING_PARTITIONS to RUNNING. >>>> *// above message repeated for each thread* >>>> >>>> *Important*: At this point only StreamThread-4 is performing commits >>>> every 10 seconds. The other threads output no logs. Now the fun begins >>>> >>>> 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Closing task's topology ... // repeated multiple times >>>> 2017-05-05 21:29:21.387 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Flushing state stores of task ... // repeated multiple times >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Committing consumer offsets of task ... // repeated multiple times >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Updating suspended tasks to contain active tasks [<list>] >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Removing all active tasks [<list>] >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-4] >>>> Removing all standby tasks [] >>>> >>>> >>>> At this point there are no more log messages for 16 minutes!! During >> this >>>> time I perform several threaddumps, almost every minute. >>>> Thread dump below. Do notice that thread 4 is the only different one. >>>> >>>> "StreamThread-5" - Thread t@57 >>>> java.lang.Thread.State: BLOCKED >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - waiting to lock <653f9d6> (a >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <3fbbc5a8> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-4" - Thread t@55 >>>> java.lang.Thread.State: RUNNABLE >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: >> 269) >>>> at >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. >> java:86) >>>> - locked <5709c085> (a sun.nio.ch.Util$2) >>>> - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) >>>> - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>>> at >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) >>>> at >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:226) >>>> - locked <639d53ee> (a >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:172) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> joinGroupIfNeeded(AbstractCoordinator.java:347) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >> ConsumerCoordinator.java:290) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer. >> pollOnce(KafkaConsumer.java:1029) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:592) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-3" - Thread t@53 >>>> java.lang.Thread.State: RUNNABLE >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <7a09baf0> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-2" - Thread t@51 >>>> java.lang.Thread.State: BLOCKED >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - waiting to lock <653f9d6> (a >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <2dc188ac> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-1" - Thread t@49 >>>> java.lang.Thread.State: RUNNABLE >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:172) >>>> - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <7dffc3aa> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> >>>> (no logs omitted, ~16 minutes later) >>>> 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Committing all tasks because the commit interval 10000ms has elapsed >>>> 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> Committing task StreamTask .. // repeated multiple times >>>> 2017-05-05 21:45:05.379 INFO 71 --- [ StreamThread-1] >>>> o.a.k.s.p.internals.StreamThread : stream-thread >> [StreamThread-1] >>>> State transition from RUNNING to PARTITIONS_REVOKED. >>>> *// The above is repeated for threads 2, 3 and 5* >>>> (no logs omitted!! This is really the next entry, ~10 minutes later) >>>> 2017-05-05 21:55:16.835 INFO 71 --- [ StreamThread-4] >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread >> [StreamThread-4] >>>> Constructed client metadata ... >>>> >>>> During the above 10 minutes all threads show the following >>>> "StreamThread-5" - Thread t@57 >>>> java.lang.Thread.State: RUNNABLE >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: >> 269) >>>> at >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. >> java:86) >>>> - locked <4a60e0dd> (a sun.nio.ch.Util$2) >>>> - locked <5e54059f> (a java.util.Collections$UnmodifiableSet) >>>> - locked <60693986> (a sun.nio.ch.EPollSelectorImpl) >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>>> at >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) >>>> at >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:226) >>>> - locked <5f1ae6c1> (a >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:172) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> joinGroupIfNeeded(AbstractCoordinator.java:347) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >> ConsumerCoordinator.java:290) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer. >> pollOnce(KafkaConsumer.java:1029) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:592) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-4" - Thread t@55 >>>> java.lang.Thread.State: RUNNABLE >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: >> 269) >>>> at >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. >> java:86) >>>> - locked <5709c085> (a sun.nio.ch.Util$2) >>>> - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) >>>> - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>>> at >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) >>>> at >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:226) >>>> - locked <639d53ee> (a >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:172) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> joinGroupIfNeeded(AbstractCoordinator.java:347) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >> ConsumerCoordinator.java:290) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer. >> pollOnce(KafkaConsumer.java:1029) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:592) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-3" - Thread t@53 >>>> java.lang.Thread.State: RUNNABLE >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: >> 269) >>>> at >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. >> java:86) >>>> - locked <123193f7> (a sun.nio.ch.Util$2) >>>> - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet) >>>> - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl) >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>>> at >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) >>>> at >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:226) >>>> - locked <4d9f6f42> (a >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:172) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> joinGroupIfNeeded(AbstractCoordinator.java:347) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >> ConsumerCoordinator.java:290) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer. >> pollOnce(KafkaConsumer.java:1029) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:592) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-2" - Thread t@51 >>>> java.lang.Thread.State: RUNNABLE >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: >> 269) >>>> at >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. >> java:86) >>>> - locked <532ff32d> (a sun.nio.ch.Util$2) >>>> - locked <76a6407> (a java.util.Collections$UnmodifiableSet) >>>> - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl) >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>>> at >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) >>>> at >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:226) >>>> - locked <29b48d84> (a >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:172) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> joinGroupIfNeeded(AbstractCoordinator.java:347) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >> ConsumerCoordinator.java:290) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer. >> pollOnce(KafkaConsumer.java:1029) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:592) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-1" - Thread t@49 >>>> java.lang.Thread.State: RUNNABLE >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: >> 269) >>>> at >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. >> java:86) >>>> - locked <5aeb504> (a sun.nio.ch.Util$2) >>>> - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet) >>>> - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl) >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>>> at >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) >>>> at >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) >>>> at >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:226) >>>> - locked <7b072bc6> (a >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( >> ConsumerNetworkClient.java:172) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> joinGroupIfNeeded(AbstractCoordinator.java:347) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >> ensureActiveGroup(AbstractCoordinator.java:303) >>>> at >>>> >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( >> ConsumerCoordinator.java:290) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer. >> pollOnce(KafkaConsumer.java:1029) >>>> at >>>> >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( >> KafkaConsumer.java:995) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:592) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> Eventually they get assigned partitions again, then they are revoked, >>>> another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we >>> get >>>> into the same situation. >>>> >>>> Finally, I tried starting up only 1 instance (with 5 threads). Current >>>> status: >>>> >>>> "StreamThread-5" - Thread t@56 >>>> java.lang.Thread.State: RUNNABLE >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <5bea9b1b> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-4" - Thread t@54 >>>> java.lang.Thread.State: RUNNABLE >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <5a06855> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-3" - Thread t@52 >>>> java.lang.Thread.State: BLOCKED >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - waiting to lock <6a55b7c6> (a >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <d3a57bc> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-2" - Thread t@50 >>>> java.lang.Thread.State: RUNNABLE >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:175) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <480f4efc> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> "StreamThread-1" - Thread t@48 >>>> java.lang.Thread.State: BLOCKED >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:169) >>>> - waiting to lock <6a55b7c6> (a >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. >> java:176) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread$ >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. >> measureLatencyNs(StreamsMetricsImpl.java:190) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( >> ProcessorNode.java:139) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( >> StreamTask.java:268) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) >>>> - locked <47b226a5> (a java.util.PriorityQueue) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( >> StreamTask.java:251) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.maybePunctuate(StreamThread.java:751) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >> StreamThread.java:633) >>>> at >>>> >>> org.apache.kafka.streams.processor.internals. >> StreamThread.run(StreamThread.java:361) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> This has been going on for over 40 minutes now and the cluster does not >>>> stabilize. Not sure what to do here, any help welcome. >>>> >>>> >>> >>