Hi there,

I wanted to add something: how many CPU cores does each of your Kubernetes 
instance have? In 0.10.2.1 we noticed a regression in environments with 1 core 
as described in https://issues.apache.org/jira/browse/KAFKA-5174 
<https://issues.apache.org/jira/browse/KAFKA-5174>. 

If you have 1 core, the workaround is to change a config as described here:
http://docs.confluent.io/current/streams/upgrade-guide.html#known-issues-and-workarounds
 
<http://docs.confluent.io/current/streams/upgrade-guide.html#known-issues-and-workarounds>

Thanks
Eno


> On May 6, 2017, at 9:48 AM, Sachin Mittal <sjmit...@gmail.com> wrote:
> 
> Note on few things.
> Set changelog topic delete retention time to as less as possible if the
> previous values for same key are not needed and can be safely cleaned up.
> Set segment size and segment retention time also low so older segments can
> be compacted and cleaned up.
> Set delete ratio to be aggressive 0.01 so segments don't grow to big.
> 
> This way state stores would be created much faster.
> 
> Also when using Windows smaller window size helps.
> 
> Try not running many stream threads on single machine unless you have a
> great hardware.
> 
> Make sure a thread is not reading from many partitions. Make sure ratio of
> partions to total threads is low.
> 
> Hope this helps.
> 
> Sachin
> 
> On 6 May 2017 13:28, "Shimi Kiviti" <shim...@gmail.com> wrote:
> 
>> This is very similar to issues that we see.
>> 
>> Did you check the status of the consumer group? In my case it will be in
>> rebalancing most of the time. Once in a while it will show consumers and
>> offsets but after a short time will go back to rebalancing.
>> 
>> How much storage does your Kafka-streams use?
>> Also, what is your k8s configuration?
>> Deployment? Deployment with emptyDir, hostPath or EBS? Statefulset?
>> 
>> Thanks,
>> Shimi
>> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.harti...@gmail.com> wrote:
>> 
>>> After a while the instance started running.
>>> 
>>> 2017-05-05 22:40:26.806  INFO 85 --- [ StreamThread-4]
>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>> Committing task StreamTask 1_62
>>> (this is literally the next message)
>>> 2017-05-05 23:13:27.820  INFO 85 --- [ StreamThread-4]
>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>> Committing all tasks because the commit interval 10000ms has elapsed
>>> 
>>> On Fri, May 5, 2017 at 3:48 PM João Peixoto <joao.harti...@gmail.com>
>>> wrote:
>>> 
>>>> Warning, long message
>>>> 
>>>> *Problem*: Initializing a Kafka Stream is taking a loooong time.
>>>> Currently at the 40 minute mark
>>>> 
>>>> *Setup*:
>>>> 2 co-partition topics with 100 partitions.
>>>> First topic contains a lot of messages in the order of hundreds of
>>> millions
>>>> Second topic is a KTable and contains ~30k records
>>>> 
>>>> Kafka cluster with 6 brokers running 0.10.1
>>>> 
>>>> Kafka streams running on 0.10.2.1. 5 instances with 5 threads each.
>>>> The instances are running on Kubernetes
>>>> 
>>>> *Stream Configuration*:
>>>> Properties props = new Properties();
>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>>>> Serdes.String().getClass().getName());
>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>>>> Serdes.ByteArray().getClass().getName());
>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
>>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
>>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
>>>> 
>>>> *The events*:
>>>> I started 5 instances of my stream configuration at the same time. This
>>> is
>>>> the first
>>>> time this configuration is running.
>>>> 
>>>> 2017-05-05 21:23:03.283  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Creating producer client
>>>> 2017-05-05 21:23:03.415  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Creating consumer client
>>>> 2017-05-05 21:23:03.520  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Creating restore consumer client
>>>> 2017-05-05 21:23:03.528  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> State transition from NOT_RUNNING to RUNNING.
>>>> 2017-05-05 21:23:03.531  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Creating producer client
>>>> 2017-05-05 21:23:03.564  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Creating consumer client
>>>> 2017-05-05 21:23:03.569  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Creating restore consumer client
>>>> 2017-05-05 21:23:03.615  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> State transition from NOT_RUNNING to RUNNING.
>>>> 2017-05-05 21:23:03.617  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Creating producer client
>>>> 2017-05-05 21:23:03.621  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Creating consumer client
>>>> 2017-05-05 21:23:03.625  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Creating restore consumer client
>>>> 2017-05-05 21:23:03.628  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> State transition from NOT_RUNNING to RUNNING.
>>>> 2017-05-05 21:23:03.629  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Creating producer client
>>>> 2017-05-05 21:23:03.632  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Creating consumer client
>>>> 2017-05-05 21:23:03.635  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Creating restore consumer client
>>>> 2017-05-05 21:23:03.638  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> State transition from NOT_RUNNING to RUNNING.
>>>> 2017-05-05 21:23:03.639  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Creating producer client
>>>> 2017-05-05 21:23:03.641  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Creating consumer client
>>>> 2017-05-05 21:23:03.644  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Creating restore consumer client
>>>> 2017-05-05 21:23:03.647  INFO 71 --- [           main]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> State transition from NOT_RUNNING to RUNNING.
>>>> 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Starting
>>>> 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Starting
>>>> 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Starting
>>>> 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Starting
>>>> 2017-05-05 21:23:03.792  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Starting
>>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> at state RUNNING: partitions [] revoked at the beginning of consumer
>>>> rebalance.
>>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> at state RUNNING: partitions [] revoked at the beginning of consumer
>>>> rebalance.
>>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> at state RUNNING: partitions [] revoked at the beginning of consumer
>>>> rebalance.
>>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> at state RUNNING: partitions [] revoked at the beginning of consumer
>>>> rebalance.
>>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> at state RUNNING: partitions [] revoked at the beginning of consumer
>>>> rebalance.
>>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Updating suspended tasks to contain active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Updating suspended tasks to contain active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Updating suspended tasks to contain active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Updating suspended tasks to contain active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Updating suspended tasks to contain active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Removing all active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Removing all active tasks []
>>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Removing all active tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Removing all active tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Removing all active tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> Removing all standby tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Removing all standby tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> Removing all standby tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Removing all standby tasks []
>>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> Removing all standby tasks []
>>>> 2017-05-05 21:23:04.020  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Constructed client metadata
>>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
>>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks:
>> ([])
>>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost:
>> 0.0]}}
>>>> from the member subscriptions.
>>>> 2017-05-05 21:23:04.218  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Completed validating internal topics in partition assignor
>>>> 2017-05-05 21:23:04.591  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Completed validating internal topics in partition assignor
>>>> 2017-05-05 21:23:04.726  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Assigned tasks to clients as
>>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
>>>> assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([])
>>>> capacity: 1.0 cost: 100.0]}.
>>>> 2017-05-05 21:23:04.742  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Constructed client metadata
>>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
>>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks:
>> ([])
>>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
>> 0.0]}}
>>>> from the member subscriptions.
>>>> 2017-05-05 21:23:05.120  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Completed validating internal topics in partition assignor
>>>> 2017-05-05 21:23:05.482  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Completed validating internal topics in partition assignor
>>>> 2017-05-05 21:23:05.520  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Assigned tasks to clients as
>>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
>>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
>> 50.0],
>>>> da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>])
>>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
>> 50.0]}.
>>>> 2017-05-05 21:23:05.553  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned
>> at
>>>> the end of consumer rebalance.
>>>> *// The above line is repeated for each thread*
>>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
>>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-5]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-5]
>>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
>>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
>>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
>>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-3]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-3]
>>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
>>>> *// omitted*
>>>> 2017-05-05 21:23:15.596  INFO 71 --- [ StreamThread-2]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-2]
>>>> State transition from ASSIGNING_PARTITIONS to RUNNING.
>>>> *// above message repeated for each thread*
>>>> 
>>>> *Important*: At this point only StreamThread-4 is performing commits
>>>> every 10 seconds. The other threads output no logs. Now the fun begins
>>>> 
>>>> 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Closing task's topology ... // repeated multiple times
>>>> 2017-05-05 21:29:21.387  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Flushing state stores of task ... // repeated multiple times
>>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Committing consumer offsets of task ... // repeated multiple times
>>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Updating suspended tasks to contain active tasks [<list>]
>>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Removing all active tasks [<list>]
>>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-4]
>>>> Removing all standby tasks []
>>>> 
>>>> 
>>>> At this point there are no more log messages for 16 minutes!! During
>> this
>>>> time I perform several threaddumps, almost every minute.
>>>> Thread dump below. Do notice that thread 4 is the only different one.
>>>> 
>>>> "StreamThread-5" - Thread t@57
>>>>   java.lang.Thread.State: BLOCKED
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - waiting to lock <653f9d6> (a
>>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <3fbbc5a8> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-4" - Thread t@55
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
>> 269)
>>>>        at
>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
>> java:86)
>>>>        - locked <5709c085> (a sun.nio.ch.Util$2)
>>>>        - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
>>>>        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
>>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>>        at
>>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
>>>>        at
>>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>>>>        at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:226)
>>>>        - locked <639d53ee> (a
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:172)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:347)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-3" - Thread t@53
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor)
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <7a09baf0> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-2" - Thread t@51
>>>>   java.lang.Thread.State: BLOCKED
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - waiting to lock <653f9d6> (a
>>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <2dc188ac> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-1" - Thread t@49
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:172)
>>>>        - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor)
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <7dffc3aa> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> 
>>>> (no logs omitted, ~16 minutes later)
>>>> 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Committing all tasks because the commit interval 10000ms has elapsed
>>>> 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> Committing task StreamTask .. // repeated multiple times
>>>> 2017-05-05 21:45:05.379  INFO 71 --- [ StreamThread-1]
>>>> o.a.k.s.p.internals.StreamThread         : stream-thread
>> [StreamThread-1]
>>>> State transition from RUNNING to PARTITIONS_REVOKED.
>>>> *// The above is repeated for threads 2, 3 and 5*
>>>> (no logs omitted!! This is really the next entry, ~10 minutes later)
>>>> 2017-05-05 21:55:16.835  INFO 71 --- [ StreamThread-4]
>>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
>> [StreamThread-4]
>>>> Constructed client metadata ...
>>>> 
>>>> During the above 10 minutes all threads show the following
>>>> "StreamThread-5" - Thread t@57
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
>> 269)
>>>>        at
>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
>> java:86)
>>>>        - locked <4a60e0dd> (a sun.nio.ch.Util$2)
>>>>        - locked <5e54059f> (a java.util.Collections$UnmodifiableSet)
>>>>        - locked <60693986> (a sun.nio.ch.EPollSelectorImpl)
>>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>>        at
>>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
>>>>        at
>>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>>>>        at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:226)
>>>>        - locked <5f1ae6c1> (a
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:172)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:347)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-4" - Thread t@55
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
>> 269)
>>>>        at
>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
>> java:86)
>>>>        - locked <5709c085> (a sun.nio.ch.Util$2)
>>>>        - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
>>>>        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
>>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>>        at
>>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
>>>>        at
>>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>>>>        at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:226)
>>>>        - locked <639d53ee> (a
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:172)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:347)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-3" - Thread t@53
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
>> 269)
>>>>        at
>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
>> java:86)
>>>>        - locked <123193f7> (a sun.nio.ch.Util$2)
>>>>        - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet)
>>>>        - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl)
>>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>>        at
>>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
>>>>        at
>>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>>>>        at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:226)
>>>>        - locked <4d9f6f42> (a
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:172)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:347)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-2" - Thread t@51
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
>> 269)
>>>>        at
>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
>> java:86)
>>>>        - locked <532ff32d> (a sun.nio.ch.Util$2)
>>>>        - locked <76a6407> (a java.util.Collections$UnmodifiableSet)
>>>>        - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl)
>>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>>        at
>>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
>>>>        at
>>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>>>>        at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:226)
>>>>        - locked <29b48d84> (a
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:172)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:347)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-1" - Thread t@49
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
>> 269)
>>>>        at
>>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
>> java:86)
>>>>        - locked <5aeb504> (a sun.nio.ch.Util$2)
>>>>        - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet)
>>>>        - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl)
>>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>>        at
>>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
>>>>        at
>>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
>>>>        at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:226)
>>>>        - locked <7b072bc6> (a
>>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
>> ConsumerNetworkClient.java:172)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> joinGroupIfNeeded(AbstractCoordinator.java:347)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
>> ensureActiveGroup(AbstractCoordinator.java:303)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
>> ConsumerCoordinator.java:290)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.
>> pollOnce(KafkaConsumer.java:1029)
>>>>        at
>>>> 
>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:592)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> Eventually they get assigned partitions again, then they are revoked,
>>>> another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we
>>> get
>>>> into the same situation.
>>>> 
>>>> Finally, I tried starting up only 1 instance (with 5 threads). Current
>>>> status:
>>>> 
>>>> "StreamThread-5" - Thread t@56
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor)
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <5bea9b1b> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-4" - Thread t@54
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor)
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <5a06855> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-3" - Thread t@52
>>>>   java.lang.Thread.State: BLOCKED
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - waiting to lock <6a55b7c6> (a
>>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <d3a57bc> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-2" - Thread t@50
>>>>   java.lang.Thread.State: RUNNABLE
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:175)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <480f4efc> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> "StreamThread-1" - Thread t@48
>>>>   java.lang.Thread.State: BLOCKED
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:169)
>>>>        - waiting to lock <6a55b7c6> (a
>>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56
>>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
>> java:176)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread$
>> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:190)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(
>> ProcessorNode.java:139)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
>> StreamTask.java:268)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
>>>>        - locked <47b226a5> (a java.util.PriorityQueue)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(
>> StreamTask.java:251)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybePunctuate(StreamThread.java:751)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:633)
>>>>        at
>>>> 
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:361)
>>>> 
>>>>   Locked ownable synchronizers:
>>>>        - None
>>>> 
>>>> This has been going on for over 40 minutes now and the cluster does not
>>>> stabilize. Not sure what to do here, any help welcome.
>>>> 
>>>> 
>>> 
>> 

Reply via email to