Thanks for the feedback. Here is additional information: * The stream instances are deployed on kubernetes through deployments. I do not know if they use emptyDir, hostPath or EBS * The instances have 2 cores minimum
Good advice on the state stores, I already had some of those configurations, but for this issue in particular the state stores are empty, since this happens when the kafka stream bootstraps for the first time. On Sat, May 6, 2017 at 7:31 AM Eno Thereska <eno.there...@gmail.com> wrote: > Hi there, > > I wanted to add something: how many CPU cores does each of your Kubernetes > instance have? In 0.10.2.1 we noticed a regression in environments with 1 > core as described in https://issues.apache.org/jira/browse/KAFKA-5174 < > https://issues.apache.org/jira/browse/KAFKA-5174>. > > If you have 1 core, the workaround is to change a config as described here: > > http://docs.confluent.io/current/streams/upgrade-guide.html#known-issues-and-workarounds > < > http://docs.confluent.io/current/streams/upgrade-guide.html#known-issues-and-workarounds > > > > Thanks > Eno > > > > On May 6, 2017, at 9:48 AM, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Note on few things. > > Set changelog topic delete retention time to as less as possible if the > > previous values for same key are not needed and can be safely cleaned up. > > Set segment size and segment retention time also low so older segments > can > > be compacted and cleaned up. > > Set delete ratio to be aggressive 0.01 so segments don't grow to big. > > > > This way state stores would be created much faster. > > > > Also when using Windows smaller window size helps. > > > > Try not running many stream threads on single machine unless you have a > > great hardware. > > > > Make sure a thread is not reading from many partitions. Make sure ratio > of > > partions to total threads is low. > > > > Hope this helps. > > > > Sachin > > > > On 6 May 2017 13:28, "Shimi Kiviti" <shim...@gmail.com> wrote: > > > >> This is very similar to issues that we see. > >> > >> Did you check the status of the consumer group? In my case it will be in > >> rebalancing most of the time. Once in a while it will show consumers and > >> offsets but after a short time will go back to rebalancing. > >> > >> How much storage does your Kafka-streams use? > >> Also, what is your k8s configuration? > >> Deployment? Deployment with emptyDir, hostPath or EBS? Statefulset? > >> > >> Thanks, > >> Shimi > >> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.harti...@gmail.com> > wrote: > >> > >>> After a while the instance started running. > >>> > >>> 2017-05-05 22:40:26.806 INFO 85 --- [ StreamThread-4] > >>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>> Committing task StreamTask 1_62 > >>> (this is literally the next message) > >>> 2017-05-05 23:13:27.820 INFO 85 --- [ StreamThread-4] > >>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>> Committing all tasks because the commit interval 10000ms has elapsed > >>> > >>> On Fri, May 5, 2017 at 3:48 PM João Peixoto <joao.harti...@gmail.com> > >>> wrote: > >>> > >>>> Warning, long message > >>>> > >>>> *Problem*: Initializing a Kafka Stream is taking a loooong time. > >>>> Currently at the 40 minute mark > >>>> > >>>> *Setup*: > >>>> 2 co-partition topics with 100 partitions. > >>>> First topic contains a lot of messages in the order of hundreds of > >>> millions > >>>> Second topic is a KTable and contains ~30k records > >>>> > >>>> Kafka cluster with 6 brokers running 0.10.1 > >>>> > >>>> Kafka streams running on 0.10.2.1. 5 instances with 5 threads each. > >>>> The instances are running on Kubernetes > >>>> > >>>> *Stream Configuration*: > >>>> Properties props = new Properties(); > >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); > >>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...); > >>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>>> Serdes.String().getClass().getName()); > >>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>>> Serdes.ByteArray().getClass().getName()); > >>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); > >>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG"); > >>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); > >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > >>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); > >>>> > >>>> *The events*: > >>>> I started 5 instances of my stream configuration at the same time. > This > >>> is > >>>> the first > >>>> time this configuration is running. > >>>> > >>>> 2017-05-05 21:23:03.283 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Creating producer client > >>>> 2017-05-05 21:23:03.415 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Creating consumer client > >>>> 2017-05-05 21:23:03.520 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Creating restore consumer client > >>>> 2017-05-05 21:23:03.528 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> State transition from NOT_RUNNING to RUNNING. > >>>> 2017-05-05 21:23:03.531 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Creating producer client > >>>> 2017-05-05 21:23:03.564 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Creating consumer client > >>>> 2017-05-05 21:23:03.569 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Creating restore consumer client > >>>> 2017-05-05 21:23:03.615 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> State transition from NOT_RUNNING to RUNNING. > >>>> 2017-05-05 21:23:03.617 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Creating producer client > >>>> 2017-05-05 21:23:03.621 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Creating consumer client > >>>> 2017-05-05 21:23:03.625 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Creating restore consumer client > >>>> 2017-05-05 21:23:03.628 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> State transition from NOT_RUNNING to RUNNING. > >>>> 2017-05-05 21:23:03.629 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Creating producer client > >>>> 2017-05-05 21:23:03.632 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Creating consumer client > >>>> 2017-05-05 21:23:03.635 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Creating restore consumer client > >>>> 2017-05-05 21:23:03.638 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> State transition from NOT_RUNNING to RUNNING. > >>>> 2017-05-05 21:23:03.639 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Creating producer client > >>>> 2017-05-05 21:23:03.641 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Creating consumer client > >>>> 2017-05-05 21:23:03.644 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Creating restore consumer client > >>>> 2017-05-05 21:23:03.647 INFO 71 --- [ main] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> State transition from NOT_RUNNING to RUNNING. > >>>> 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Starting > >>>> 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Starting > >>>> 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Starting > >>>> 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Starting > >>>> 2017-05-05 21:23:03.792 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Starting > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer > >>>> rebalance. > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer > >>>> rebalance. > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer > >>>> rebalance. > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer > >>>> rebalance. > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer > >>>> rebalance. > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Updating suspended tasks to contain active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Updating suspended tasks to contain active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Updating suspended tasks to contain active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Updating suspended tasks to contain active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Updating suspended tasks to contain active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Removing all active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Removing all active tasks [] > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Removing all active tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Removing all active tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Removing all active tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> Removing all standby tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Removing all standby tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> Removing all standby tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Removing all standby tasks [] > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> Removing all standby tasks [] > >>>> 2017-05-05 21:23:04.020 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Constructed client metadata > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, > >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: > >> ([]) > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: > >> 0.0]}} > >>>> from the member subscriptions. > >>>> 2017-05-05 21:23:04.218 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Completed validating internal topics in partition assignor > >>>> 2017-05-05 21:23:04.591 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Completed validating internal topics in partition assignor > >>>> 2017-05-05 21:23:04.726 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Assigned tasks to clients as > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) > >>>> assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: > ([]) > >>>> capacity: 1.0 cost: 100.0]}. > >>>> 2017-05-05 21:23:04.742 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Constructed client metadata > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, > >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: > >> ([]) > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: > >> 0.0]}} > >>>> from the member subscriptions. > >>>> 2017-05-05 21:23:05.120 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Completed validating internal topics in partition assignor > >>>> 2017-05-05 21:23:05.482 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Completed validating internal topics in partition assignor > >>>> 2017-05-05 21:23:05.520 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Assigned tasks to clients as > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: > >> 50.0], > >>>> da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>]) > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: > >> 50.0]}. > >>>> 2017-05-05 21:23:05.553 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned > >> at > >>>> the end of consumer rebalance. > >>>> *// The above line is repeated for each thread* > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-5] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-5] > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-3] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-3] > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS. > >>>> *// omitted* > >>>> 2017-05-05 21:23:15.596 INFO 71 --- [ StreamThread-2] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-2] > >>>> State transition from ASSIGNING_PARTITIONS to RUNNING. > >>>> *// above message repeated for each thread* > >>>> > >>>> *Important*: At this point only StreamThread-4 is performing commits > >>>> every 10 seconds. The other threads output no logs. Now the fun begins > >>>> > >>>> 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Closing task's topology ... // repeated multiple times > >>>> 2017-05-05 21:29:21.387 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Flushing state stores of task ... // repeated multiple times > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Committing consumer offsets of task ... // repeated multiple times > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Updating suspended tasks to contain active tasks [<list>] > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Removing all active tasks [<list>] > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-4] > >>>> Removing all standby tasks [] > >>>> > >>>> > >>>> At this point there are no more log messages for 16 minutes!! During > >> this > >>>> time I perform several threaddumps, almost every minute. > >>>> Thread dump below. Do notice that thread 4 is the only different one. > >>>> > >>>> "StreamThread-5" - Thread t@57 > >>>> java.lang.Thread.State: BLOCKED > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - waiting to lock <653f9d6> (a > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <3fbbc5a8> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-4" - Thread t@55 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > >> 269) > >>>> at > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > >> java:86) > >>>> - locked <5709c085> (a sun.nio.ch.Util$2) > >>>> - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) > >>>> - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>>> at > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) > >>>> at > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > >>>> at > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:226) > >>>> - locked <639d53ee> (a > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:172) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:290) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:995) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:592) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-3" - Thread t@53 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <7a09baf0> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-2" - Thread t@51 > >>>> java.lang.Thread.State: BLOCKED > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - waiting to lock <653f9d6> (a > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49 > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <2dc188ac> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-1" - Thread t@49 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:172) > >>>> - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor) > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <7dffc3aa> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> > >>>> (no logs omitted, ~16 minutes later) > >>>> 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Committing all tasks because the commit interval 10000ms has elapsed > >>>> 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> Committing task StreamTask .. // repeated multiple times > >>>> 2017-05-05 21:45:05.379 INFO 71 --- [ StreamThread-1] > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > >> [StreamThread-1] > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > >>>> *// The above is repeated for threads 2, 3 and 5* > >>>> (no logs omitted!! This is really the next entry, ~10 minutes later) > >>>> 2017-05-05 21:55:16.835 INFO 71 --- [ StreamThread-4] > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > >> [StreamThread-4] > >>>> Constructed client metadata ... > >>>> > >>>> During the above 10 minutes all threads show the following > >>>> "StreamThread-5" - Thread t@57 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > >> 269) > >>>> at > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > >> java:86) > >>>> - locked <4a60e0dd> (a sun.nio.ch.Util$2) > >>>> - locked <5e54059f> (a java.util.Collections$UnmodifiableSet) > >>>> - locked <60693986> (a sun.nio.ch.EPollSelectorImpl) > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>>> at > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) > >>>> at > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > >>>> at > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:226) > >>>> - locked <5f1ae6c1> (a > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:172) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:290) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:995) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:592) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-4" - Thread t@55 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > >> 269) > >>>> at > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > >> java:86) > >>>> - locked <5709c085> (a sun.nio.ch.Util$2) > >>>> - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet) > >>>> - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>>> at > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) > >>>> at > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > >>>> at > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:226) > >>>> - locked <639d53ee> (a > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:172) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:290) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:995) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:592) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-3" - Thread t@53 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > >> 269) > >>>> at > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > >> java:86) > >>>> - locked <123193f7> (a sun.nio.ch.Util$2) > >>>> - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet) > >>>> - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl) > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>>> at > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) > >>>> at > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > >>>> at > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:226) > >>>> - locked <4d9f6f42> (a > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:172) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:290) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:995) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:592) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-2" - Thread t@51 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > >> 269) > >>>> at > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > >> java:86) > >>>> - locked <532ff32d> (a sun.nio.ch.Util$2) > >>>> - locked <76a6407> (a java.util.Collections$UnmodifiableSet) > >>>> - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl) > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>>> at > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) > >>>> at > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > >>>> at > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:226) > >>>> - locked <29b48d84> (a > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:172) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:290) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:995) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:592) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-1" - Thread t@49 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > >> 269) > >>>> at > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > >> java:86) > >>>> - locked <5aeb504> (a sun.nio.ch.Util$2) > >>>> - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet) > >>>> - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl) > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>>> at > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489) > >>>> at > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > >>>> at > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:226) > >>>> - locked <7b072bc6> (a > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > >> ConsumerNetworkClient.java:172) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >> ensureActiveGroup(AbstractCoordinator.java:303) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > >> ConsumerCoordinator.java:290) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1029) > >>>> at > >>>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:995) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:592) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> Eventually they get assigned partitions again, then they are revoked, > >>>> another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we > >>> get > >>>> into the same situation. > >>>> > >>>> Finally, I tried starting up only 1 instance (with 5 threads). Current > >>>> status: > >>>> > >>>> "StreamThread-5" - Thread t@56 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <5bea9b1b> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-4" - Thread t@54 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor) > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <5a06855> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-3" - Thread t@52 > >>>> java.lang.Thread.State: BLOCKED > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - waiting to lock <6a55b7c6> (a > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <d3a57bc> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-2" - Thread t@50 > >>>> java.lang.Thread.State: RUNNABLE > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:175) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <480f4efc> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> "StreamThread-1" - Thread t@48 > >>>> java.lang.Thread.State: BLOCKED > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:169) > >>>> - waiting to lock <6a55b7c6> (a > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56 > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > >> java:176) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > >> measureLatencyNs(StreamsMetricsImpl.java:190) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate( > >> ProcessorNode.java:139) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate( > >> StreamTask.java:268) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > >>>> - locked <47b226a5> (a java.util.PriorityQueue) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate( > >> StreamTask.java:251) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.maybePunctuate(StreamThread.java:751) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >> StreamThread.java:633) > >>>> at > >>>> > >>> org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:361) > >>>> > >>>> Locked ownable synchronizers: > >>>> - None > >>>> > >>>> This has been going on for over 40 minutes now and the cluster does > not > >>>> stabilize. Not sure what to do here, any help welcome. > >>>> > >>>> > >>> > >> > >