What we do is start and instance and wait till it get all the partitions, then start second and so on. Hope this works as a workaround till they fix this in next release.
On Wed, May 10, 2017 at 12:05 AM, João Peixoto <joao.harti...@gmail.com> wrote: > Guozhang thanks a lot for that info, that is exactly what I'm observing it > seems. > > I'll keep an eye out. > > JP > > On Mon, May 8, 2017 at 3:17 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello, > > > > Just to adds a few more pointers that there is a few improvements we have > > added in trunk and are considering to also piggy-back to a 0.10.2 in case > > we can have a 0.10.2.2 release, and one of them that would help with this > > case: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 134%3A+Delay+initial+consumer+group+rebalance > > > > The key idea is that with many instances of the same app starting up at > the > > same time, in your case 5 * 5 = 25 threads, we can consider 1) reduce the > > latency of a single rebalance, 2) reduce the number of consecutive > > rebalances until all instances are up and running, and the above one is > > aimed for the second case. So I'd suggest taking a look at the app's logs > > and see if there are multiple rebalances triggered during the starting > up, > > and if yes the above fix may help the most. > > > > > > Guozhang > > > > > > On Mon, May 8, 2017 at 7:41 AM, João Peixoto <joao.harti...@gmail.com> > > wrote: > > > > > Thanks for the feedback. Here is additional information: > > > > > > * The stream instances are deployed on kubernetes through deployments. > I > > do > > > not know if they use emptyDir, hostPath or EBS > > > * The instances have 2 cores minimum > > > > > > Good advice on the state stores, I already had some of those > > > configurations, but for this issue in particular the state stores are > > > empty, since this happens when the kafka stream bootstraps for the > first > > > time. > > > > > > > > > > > > On Sat, May 6, 2017 at 7:31 AM Eno Thereska <eno.there...@gmail.com> > > > wrote: > > > > > > > Hi there, > > > > > > > > I wanted to add something: how many CPU cores does each of your > > > Kubernetes > > > > instance have? In 0.10.2.1 we noticed a regression in environments > > with 1 > > > > core as described in https://issues.apache.org/ > jira/browse/KAFKA-5174 > > < > > > > https://issues.apache.org/jira/browse/KAFKA-5174>. > > > > > > > > If you have 1 core, the workaround is to change a config as described > > > here: > > > > > > > > http://docs.confluent.io/current/streams/upgrade-guide. > > > html#known-issues-and-workarounds > > > > < > > > > http://docs.confluent.io/current/streams/upgrade-guide. > > > html#known-issues-and-workarounds > > > > > > > > > > > > > Thanks > > > > Eno > > > > > > > > > > > > > On May 6, 2017, at 9:48 AM, Sachin Mittal <sjmit...@gmail.com> > > wrote: > > > > > > > > > > Note on few things. > > > > > Set changelog topic delete retention time to as less as possible if > > the > > > > > previous values for same key are not needed and can be safely > cleaned > > > up. > > > > > Set segment size and segment retention time also low so older > > segments > > > > can > > > > > be compacted and cleaned up. > > > > > Set delete ratio to be aggressive 0.01 so segments don't grow to > big. > > > > > > > > > > This way state stores would be created much faster. > > > > > > > > > > Also when using Windows smaller window size helps. > > > > > > > > > > Try not running many stream threads on single machine unless you > > have a > > > > > great hardware. > > > > > > > > > > Make sure a thread is not reading from many partitions. Make sure > > ratio > > > > of > > > > > partions to total threads is low. > > > > > > > > > > Hope this helps. > > > > > > > > > > Sachin > > > > > > > > > > On 6 May 2017 13:28, "Shimi Kiviti" <shim...@gmail.com> wrote: > > > > > > > > > >> This is very similar to issues that we see. > > > > >> > > > > >> Did you check the status of the consumer group? In my case it will > > be > > > in > > > > >> rebalancing most of the time. Once in a while it will show > consumers > > > and > > > > >> offsets but after a short time will go back to rebalancing. > > > > >> > > > > >> How much storage does your Kafka-streams use? > > > > >> Also, what is your k8s configuration? > > > > >> Deployment? Deployment with emptyDir, hostPath or EBS? > Statefulset? > > > > >> > > > > >> Thanks, > > > > >> Shimi > > > > >> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.harti...@gmail.com> > > > > wrote: > > > > >> > > > > >>> After a while the instance started running. > > > > >>> > > > > >>> 2017-05-05 22:40:26.806 INFO 85 --- [ StreamThread-4] > > > > >>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>> Committing task StreamTask 1_62 > > > > >>> (this is literally the next message) > > > > >>> 2017-05-05 23:13:27.820 INFO 85 --- [ StreamThread-4] > > > > >>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>> Committing all tasks because the commit interval 10000ms has > > elapsed > > > > >>> > > > > >>> On Fri, May 5, 2017 at 3:48 PM João Peixoto < > > joao.harti...@gmail.com > > > > > > > > >>> wrote: > > > > >>> > > > > >>>> Warning, long message > > > > >>>> > > > > >>>> *Problem*: Initializing a Kafka Stream is taking a loooong time. > > > > >>>> Currently at the 40 minute mark > > > > >>>> > > > > >>>> *Setup*: > > > > >>>> 2 co-partition topics with 100 partitions. > > > > >>>> First topic contains a lot of messages in the order of hundreds > of > > > > >>> millions > > > > >>>> Second topic is a KTable and contains ~30k records > > > > >>>> > > > > >>>> Kafka cluster with 6 brokers running 0.10.1 > > > > >>>> > > > > >>>> Kafka streams running on 0.10.2.1. 5 instances with 5 threads > > each. > > > > >>>> The instances are running on Kubernetes > > > > >>>> > > > > >>>> *Stream Configuration*: > > > > >>>> Properties props = new Properties(); > > > > >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); > > > > >>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...); > > > > >>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > > >>>> Serdes.String().getClass().getName()); > > > > >>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > > >>>> Serdes.ByteArray().getClass().getName()); > > > > >>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000); > > > > >>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, > "DEBUG"); > > > > >>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5); > > > > >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > > > > >>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); > > > > >>>> > > > > >>>> *The events*: > > > > >>>> I started 5 instances of my stream configuration at the same > time. > > > > This > > > > >>> is > > > > >>>> the first > > > > >>>> time this configuration is running. > > > > >>>> > > > > >>>> 2017-05-05 21:23:03.283 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Creating producer client > > > > >>>> 2017-05-05 21:23:03.415 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Creating consumer client > > > > >>>> 2017-05-05 21:23:03.520 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Creating restore consumer client > > > > >>>> 2017-05-05 21:23:03.528 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> State transition from NOT_RUNNING to RUNNING. > > > > >>>> 2017-05-05 21:23:03.531 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Creating producer client > > > > >>>> 2017-05-05 21:23:03.564 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Creating consumer client > > > > >>>> 2017-05-05 21:23:03.569 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Creating restore consumer client > > > > >>>> 2017-05-05 21:23:03.615 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> State transition from NOT_RUNNING to RUNNING. > > > > >>>> 2017-05-05 21:23:03.617 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Creating producer client > > > > >>>> 2017-05-05 21:23:03.621 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Creating consumer client > > > > >>>> 2017-05-05 21:23:03.625 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Creating restore consumer client > > > > >>>> 2017-05-05 21:23:03.628 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> State transition from NOT_RUNNING to RUNNING. > > > > >>>> 2017-05-05 21:23:03.629 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Creating producer client > > > > >>>> 2017-05-05 21:23:03.632 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Creating consumer client > > > > >>>> 2017-05-05 21:23:03.635 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Creating restore consumer client > > > > >>>> 2017-05-05 21:23:03.638 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> State transition from NOT_RUNNING to RUNNING. > > > > >>>> 2017-05-05 21:23:03.639 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Creating producer client > > > > >>>> 2017-05-05 21:23:03.641 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Creating consumer client > > > > >>>> 2017-05-05 21:23:03.644 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Creating restore consumer client > > > > >>>> 2017-05-05 21:23:03.647 INFO 71 --- [ main] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> State transition from NOT_RUNNING to RUNNING. > > > > >>>> 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Starting > > > > >>>> 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Starting > > > > >>>> 2017-05-05 21:23:03.790 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Starting > > > > >>>> 2017-05-05 21:23:03.791 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Starting > > > > >>>> 2017-05-05 21:23:03.792 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Starting > > > > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> at state RUNNING: partitions [] revoked at the beginning of > > consumer > > > > >>>> rebalance. > > > > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> at state RUNNING: partitions [] revoked at the beginning of > > consumer > > > > >>>> rebalance. > > > > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> at state RUNNING: partitions [] revoked at the beginning of > > consumer > > > > >>>> rebalance. > > > > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> 2017-05-05 21:23:03.966 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> at state RUNNING: partitions [] revoked at the beginning of > > consumer > > > > >>>> rebalance. > > > > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> at state RUNNING: partitions [] revoked at the beginning of > > consumer > > > > >>>> rebalance. > > > > >>>> 2017-05-05 21:23:03.967 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> 2017-05-05 21:23:03.968 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Updating suspended tasks to contain active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Updating suspended tasks to contain active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Updating suspended tasks to contain active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Updating suspended tasks to contain active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Updating suspended tasks to contain active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Removing all active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Removing all active tasks [] > > > > >>>> 2017-05-05 21:23:03.970 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Removing all active tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Removing all active tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Removing all active tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> Removing all standby tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Removing all standby tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> Removing all standby tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Removing all standby tasks [] > > > > >>>> 2017-05-05 21:23:03.971 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> Removing all standby tasks [] > > > > >>>> 2017-05-05 21:23:04.020 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Constructed client metadata > > > > >>>> > > {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, > > > > >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) > > assignedTasks: > > > > >> ([]) > > > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 > cost: > > > > >> 0.0]}} > > > > >>>> from the member subscriptions. > > > > >>>> 2017-05-05 21:23:04.218 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Completed validating internal topics in partition assignor > > > > >>>> 2017-05-05 21:23:04.591 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Completed validating internal topics in partition assignor > > > > >>>> 2017-05-05 21:23:04.726 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Assigned tasks to clients as > > > > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) > > > > >>>> assignedTasks: ([<list>]) prevActiveTasks: ([]) > prevAssignedTasks: > > > > ([]) > > > > >>>> capacity: 1.0 cost: 100.0]}. > > > > >>>> 2017-05-05 21:23:04.742 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Constructed client metadata > > > > >>>> > > {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null, > > > > >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) > > assignedTasks: > > > > >> ([]) > > > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 > cost: > > > > >> 0.0]}} > > > > >>>> from the member subscriptions. > > > > >>>> 2017-05-05 21:23:05.120 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Completed validating internal topics in partition assignor > > > > >>>> 2017-05-05 21:23:05.482 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Completed validating internal topics in partition assignor > > > > >>>> 2017-05-05 21:23:05.520 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Assigned tasks to clients as > > > > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>]) > > > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 > cost: > > > > >> 50.0], > > > > >>>> da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>]) > > > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 > cost: > > > > >> 50.0]}. > > > > >>>> 2017-05-05 21:23:05.553 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> at state PARTITIONS_REVOKED: new partitions [<partitionlist>] > > > assigned > > > > >> at > > > > >>>> the end of consumer rebalance. > > > > >>>> *// The above line is repeated for each thread* > > > > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> State transition from PARTITIONS_REVOKED to > ASSIGNING_PARTITIONS. > > > > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-5] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-5] > > > > >>>> State transition from PARTITIONS_REVOKED to > ASSIGNING_PARTITIONS. > > > > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> State transition from PARTITIONS_REVOKED to > ASSIGNING_PARTITIONS. > > > > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> State transition from PARTITIONS_REVOKED to > ASSIGNING_PARTITIONS. > > > > >>>> 2017-05-05 21:23:05.554 INFO 71 --- [ StreamThread-3] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-3] > > > > >>>> State transition from PARTITIONS_REVOKED to > ASSIGNING_PARTITIONS. > > > > >>>> *// omitted* > > > > >>>> 2017-05-05 21:23:15.596 INFO 71 --- [ StreamThread-2] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-2] > > > > >>>> State transition from ASSIGNING_PARTITIONS to RUNNING. > > > > >>>> *// above message repeated for each thread* > > > > >>>> > > > > >>>> *Important*: At this point only StreamThread-4 is performing > > commits > > > > >>>> every 10 seconds. The other threads output no logs. Now the fun > > > begins > > > > >>>> > > > > >>>> 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> 2017-05-05 21:29:21.310 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Closing task's topology ... // repeated multiple times > > > > >>>> 2017-05-05 21:29:21.387 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Flushing state stores of task ... // repeated multiple times > > > > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Committing consumer offsets of task ... // repeated multiple > times > > > > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Updating suspended tasks to contain active tasks [<list>] > > > > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Removing all active tasks [<list>] > > > > >>>> 2017-05-05 21:29:21.388 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Removing all standby tasks [] > > > > >>>> > > > > >>>> > > > > >>>> At this point there are no more log messages for 16 minutes!! > > During > > > > >> this > > > > >>>> time I perform several threaddumps, almost every minute. > > > > >>>> Thread dump below. Do notice that thread 4 is the only different > > > one. > > > > >>>> > > > > >>>> "StreamThread-5" - Thread t@57 > > > > >>>> java.lang.Thread.State: BLOCKED > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - waiting to lock <653f9d6> (a > > > > >>>> org.apache.kafka.common.metrics.Sensor) owned by > "StreamThread-1" > > > t@49 > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <3fbbc5a8> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-4" - Thread t@55 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > >>>> at > > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > > > > >> 269) > > > > >>>> at > > > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > > > > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > > > > >> java:86) > > > > >>>> - locked <5709c085> (a sun.nio.ch.Util$2) > > > > >>>> - locked <1cacaaf> (a > > java.util.Collections$UnmodifiableSet) > > > > >>>> - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) > > > > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > > > >>>> at > > > > >>>> org.apache.kafka.common.network.Selector.select( > Selector.java:489) > > > > >>>> at > > > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > > > > >>>> at > > > > >>>> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:226) > > > > >>>> - locked <639d53ee> (a > > > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:172) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> ensureActiveGroup(AbstractCoordinator.java:303) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerCoordinator.poll( > > > > >> ConsumerCoordinator.java:290) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > >> pollOnce(KafkaConsumer.java:1029) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > >> KafkaConsumer.java:995) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:592) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-3" - Thread t@53 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - locked <653f9d6> (a org.apache.kafka.common. > > > metrics.Sensor) > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <7a09baf0> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-2" - Thread t@51 > > > > >>>> java.lang.Thread.State: BLOCKED > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - waiting to lock <653f9d6> (a > > > > >>>> org.apache.kafka.common.metrics.Sensor) owned by > "StreamThread-1" > > > t@49 > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <2dc188ac> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-1" - Thread t@49 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:172) > > > > >>>> - locked <653f9d6> (a org.apache.kafka.common. > > > metrics.Sensor) > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <7dffc3aa> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> > > > > >>>> (no logs omitted, ~16 minutes later) > > > > >>>> 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Committing all tasks because the commit interval 10000ms has > > elapsed > > > > >>>> 2017-05-05 21:45:05.270 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> Committing task StreamTask .. // repeated multiple times > > > > >>>> 2017-05-05 21:45:05.379 INFO 71 --- [ StreamThread-1] > > > > >>>> o.a.k.s.p.internals.StreamThread : stream-thread > > > > >> [StreamThread-1] > > > > >>>> State transition from RUNNING to PARTITIONS_REVOKED. > > > > >>>> *// The above is repeated for threads 2, 3 and 5* > > > > >>>> (no logs omitted!! This is really the next entry, ~10 minutes > > later) > > > > >>>> 2017-05-05 21:55:16.835 INFO 71 --- [ StreamThread-4] > > > > >>>> o.a.k.s.p.i.StreamPartitionAssignor : stream-thread > > > > >> [StreamThread-4] > > > > >>>> Constructed client metadata ... > > > > >>>> > > > > >>>> During the above 10 minutes all threads show the following > > > > >>>> "StreamThread-5" - Thread t@57 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > >>>> at > > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > > > > >> 269) > > > > >>>> at > > > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > > > > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > > > > >> java:86) > > > > >>>> - locked <4a60e0dd> (a sun.nio.ch.Util$2) > > > > >>>> - locked <5e54059f> (a java.util.Collections$ > > > UnmodifiableSet) > > > > >>>> - locked <60693986> (a sun.nio.ch.EPollSelectorImpl) > > > > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > > > >>>> at > > > > >>>> org.apache.kafka.common.network.Selector.select( > Selector.java:489) > > > > >>>> at > > > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > > > > >>>> at > > > > >>>> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:226) > > > > >>>> - locked <5f1ae6c1> (a > > > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:172) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> ensureActiveGroup(AbstractCoordinator.java:303) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerCoordinator.poll( > > > > >> ConsumerCoordinator.java:290) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > >> pollOnce(KafkaConsumer.java:1029) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > >> KafkaConsumer.java:995) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:592) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-4" - Thread t@55 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > >>>> at > > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > > > > >> 269) > > > > >>>> at > > > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > > > > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > > > > >> java:86) > > > > >>>> - locked <5709c085> (a sun.nio.ch.Util$2) > > > > >>>> - locked <1cacaaf> (a > > java.util.Collections$UnmodifiableSet) > > > > >>>> - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl) > > > > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > > > >>>> at > > > > >>>> org.apache.kafka.common.network.Selector.select( > Selector.java:489) > > > > >>>> at > > > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > > > > >>>> at > > > > >>>> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:226) > > > > >>>> - locked <639d53ee> (a > > > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:172) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> ensureActiveGroup(AbstractCoordinator.java:303) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerCoordinator.poll( > > > > >> ConsumerCoordinator.java:290) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > >> pollOnce(KafkaConsumer.java:1029) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > >> KafkaConsumer.java:995) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:592) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-3" - Thread t@53 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > >>>> at > > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > > > > >> 269) > > > > >>>> at > > > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > > > > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > > > > >> java:86) > > > > >>>> - locked <123193f7> (a sun.nio.ch.Util$2) > > > > >>>> - locked <6c3704d3> (a java.util.Collections$ > > > UnmodifiableSet) > > > > >>>> - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl) > > > > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > > > >>>> at > > > > >>>> org.apache.kafka.common.network.Selector.select( > Selector.java:489) > > > > >>>> at > > > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > > > > >>>> at > > > > >>>> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:226) > > > > >>>> - locked <4d9f6f42> (a > > > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:172) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> ensureActiveGroup(AbstractCoordinator.java:303) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerCoordinator.poll( > > > > >> ConsumerCoordinator.java:290) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > >> pollOnce(KafkaConsumer.java:1029) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > >> KafkaConsumer.java:995) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:592) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-2" - Thread t@51 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > >>>> at > > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > > > > >> 269) > > > > >>>> at > > > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > > > > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > > > > >> java:86) > > > > >>>> - locked <532ff32d> (a sun.nio.ch.Util$2) > > > > >>>> - locked <76a6407> (a > > java.util.Collections$UnmodifiableSet) > > > > >>>> - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl) > > > > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > > > >>>> at > > > > >>>> org.apache.kafka.common.network.Selector.select( > Selector.java:489) > > > > >>>> at > > > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > > > > >>>> at > > > > >>>> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:226) > > > > >>>> - locked <29b48d84> (a > > > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:172) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> ensureActiveGroup(AbstractCoordinator.java:303) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerCoordinator.poll( > > > > >> ConsumerCoordinator.java:290) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > >> pollOnce(KafkaConsumer.java:1029) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > >> KafkaConsumer.java:995) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:592) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-1" - Thread t@49 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > > > > >>>> at > > sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java: > > > > >> 269) > > > > >>>> at > > > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > > > > >>>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl. > > > > >> java:86) > > > > >>>> - locked <5aeb504> (a sun.nio.ch.Util$2) > > > > >>>> - locked <5130a3ea> (a java.util.Collections$ > > > UnmodifiableSet) > > > > >>>> - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl) > > > > >>>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > > > > >>>> at > > > > >>>> org.apache.kafka.common.network.Selector.select( > Selector.java:489) > > > > >>>> at > > > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298) > > > > >>>> at > > > > >>>> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:226) > > > > >>>> - locked <7b072bc6> (a > > > > >>>> org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerNetworkClient.poll( > > > > >> ConsumerNetworkClient.java:172) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> joinGroupIfNeeded(AbstractCoordinator.java:347) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > > >> ensureActiveGroup(AbstractCoordinator.java:303) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.internals. > > > ConsumerCoordinator.poll( > > > > >> ConsumerCoordinator.java:290) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer. > > > > >> pollOnce(KafkaConsumer.java:1029) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > > >> KafkaConsumer.java:995) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:592) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> Eventually they get assigned partitions again, then they are > > > revoked, > > > > >>>> another long time passes, threads 1, 2, 3 and 5 stuck on Sensor > > and > > > we > > > > >>> get > > > > >>>> into the same situation. > > > > >>>> > > > > >>>> Finally, I tried starting up only 1 instance (with 5 threads). > > > Current > > > > >>>> status: > > > > >>>> > > > > >>>> "StreamThread-5" - Thread t@56 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - locked <6a55b7c6> (a org.apache.kafka.common. > > > metrics.Sensor) > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <5bea9b1b> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-4" - Thread t@54 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - locked <6a55b7c6> (a org.apache.kafka.common. > > > metrics.Sensor) > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <5a06855> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-3" - Thread t@52 > > > > >>>> java.lang.Thread.State: BLOCKED > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - waiting to lock <6a55b7c6> (a > > > > >>>> org.apache.kafka.common.metrics.Sensor) owned by > "StreamThread-5" > > > t@56 > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <d3a57bc> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-2" - Thread t@50 > > > > >>>> java.lang.Thread.State: RUNNABLE > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:175) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <480f4efc> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> "StreamThread-1" - Thread t@48 > > > > >>>> java.lang.Thread.State: BLOCKED > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:169) > > > > >>>> - waiting to lock <6a55b7c6> (a > > > > >>>> org.apache.kafka.common.metrics.Sensor) owned by > "StreamThread-5" > > > t@56 > > > > >>>> at org.apache.kafka.common.metrics.Sensor.record(Sensor. > > > > >> java:176) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread$ > > > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl. > > > > >> measureLatencyNs(StreamsMetricsImpl.java:190) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > ProcessorNode.punctuate( > > > > >> ProcessorNode.java:139) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamTask.punctuate( > > > > >> StreamTask.java:268) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45) > > > > >>>> - locked <47b226a5> (a java.util.PriorityQueue) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > StreamTask.maybePunctuate( > > > > >> StreamTask.java:251) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.maybePunctuate(StreamThread.java:751) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > StreamThread.runLoop( > > > > >> StreamThread.java:633) > > > > >>>> at > > > > >>>> > > > > >>> org.apache.kafka.streams.processor.internals. > > > > >> StreamThread.run(StreamThread.java:361) > > > > >>>> > > > > >>>> Locked ownable synchronizers: > > > > >>>> - None > > > > >>>> > > > > >>>> This has been going on for over 40 minutes now and the cluster > > does > > > > not > > > > >>>> stabilize. Not sure what to do here, any help welcome. > > > > >>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >