I see your point Eno, but truth is, on my real app I am getting "CommitFailedException", even though I did not change "max.poll.interval.ms" and it remains at Integer.MAX_VALUE.
I'm further investigating the origin of that exception. My current working theory is that if a customer processor throws a runtime exception at the wrong time the above may happen. On Fri, Jun 9, 2017 at 9:34 AM Eno Thereska <eno.there...@gmail.com> wrote: > Even without a state store the tasks themselves will get rebalanced. > > So definitely you'll trigger the problem with the 1.2.3. steps you > describe and that is confirmed. The reason we increased " > max.poll.interval.ms" to basically infinite is to just avoid this problem. > > Eno > > On 9 Jun 2017, at 07:40, João Peixoto <joao.harti...@gmail.com> wrote: > > > > I am now able to consistently reproduce this issue with a dummy project. > > > > 1. Set "max.poll.interval.ms" to a low value > > 2. Have the pipeline take longer than the interval above > > 3. Profit > > > > This happens every single time and never recovers. > > I simulated the delay by adding a breakpoint on my IDE on a sink > "foreach" > > step and then proceeding after the above interval had elapsed. > > > > Any advice on how to work around this using 0.10.2.1 would be greatly > > appreciated. > > Hope it helps > > > > On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <joao.harti...@gmail.com> > > wrote: > > > >> But my stream definition does not have a state store at all, Rocksdb or > in > >> memory... That's the most concerning part... > >> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sjmit...@gmail.com> > wrote: > >> > >>> One instance with 10 threads may cause rocksdb issues. > >>> What is the RAM you have? > >>> > >>> Also check CPU wait time. Many rocks db instances on one machine > (depends > >>> upon number of partitions) may cause lot of disk i/o causing wait > times to > >>> increase and hence slowing down the message processing causing frequent > >>> rebalance's. > >>> > >>> Also what is your topic partitions. My experience is having one thread > per > >>> partition is ideal. > >>> > >>> Thanks > >>> Sachin > >>> > >>> > >>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <joao.harti...@gmail.com> > >>> wrote: > >>> > >>>> There is one instance with 10 threads. > >>>> > >>>> On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wangg...@gmail.com> > >>> wrote: > >>>> > >>>>> João, > >>>>> > >>>>> Do you also have multiple running instances in parallel, and how many > >>>>> threads are your running within each instance? > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto < > joao.harti...@gmail.com > >>>> > >>>>> wrote: > >>>>> > >>>>>> Eno before I do so I just want to be sure this would not be a > >>>> duplicate. > >>>>> I > >>>>>> just found the following issues: > >>>>>> > >>>>>> * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being > >>>>> fixed > >>>>>> on 0.11.0.0/0.10.2.2 (both not released afaik) > >>>>>> * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in > >>>>> progress > >>>>>> > >>>>>> On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.there...@gmail.com > >>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi there, > >>>>>>> > >>>>>>> This might be a bug, would you mind opening a JIRA (copy-pasting > >>>> below > >>>>> is > >>>>>>> sufficient). > >>>>>>> > >>>>>>> Thanks > >>>>>>> Eno > >>>>>>>> On 7 Jun 2017, at 21:38, João Peixoto <joao.harti...@gmail.com> > >>>>> wrote: > >>>>>>>> > >>>>>>>> I'm using Kafka Streams 0.10.2.1 and I still see this error > >>>>>>>> > >>>>>>>> 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > >>>>>>>> o.a.k.s.p.internals.StreamThread : Could not create task > >>>>> 0_31. > >>>>>>> Will > >>>>>>>> retry. > >>>>>>>> > >>>>>>>> org.apache.kafka.streams.errors.LockException: task [0_31] > >>> Failed > >>>> to > >>>>>> lock > >>>>>>>> the state directory for task 0_31 > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> ProcessorStateManager.<init>(ProcessorStateManager.java:100) > >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals.AbstractTask.<init>( > >>>>>> AbstractTask.java:73) > >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> StreamTask.<init>(StreamTask.java:108) > >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> StreamThread.createStreamTask(StreamThread.java:864) > >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>> StreamThread$TaskCreator. > >>>>>> createTask(StreamThread.java:1237) > >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$ > >>>>>> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > >>>>>>>> ~[kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> StreamThread.addStreamTasks(StreamThread.java:967) > >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>> StreamThread.access$600( > >>>>>> StreamThread.java:69) > >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread$1. > >>>>>> onPartitionsAssigned(StreamThread.java:234) > >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > >>>>>> onJoinComplete(ConsumerCoordinator.java:259) > >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >>>>>> joinGroupIfNeeded(AbstractCoordinator.java:352) > >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > >>>>>> ensureActiveGroup(AbstractCoordinator.java:303) > >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.clients.consumer.internals. > >>>> ConsumerCoordinator.poll( > >>>>>> ConsumerCoordinator.java:290) > >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer. > >>>>>> pollOnce(KafkaConsumer.java:1029) > >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >>>>>> KafkaConsumer.java:995) > >>>>>>>> [kafka-clients-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > >>>>>> StreamThread.java:592) > >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> at > >>>>>>>> > >>>>>>> org.apache.kafka.streams.processor.internals. > >>>>>> StreamThread.run(StreamThread.java:361) > >>>>>>>> [kafka-streams-0.10.2.1.jar!/:na] > >>>>>>>> > >>>>>>>> > >>>>>>>> It has been printing it for hours now, so it does not recover at > >>>> all. > >>>>>>>> The most worrying thing is that this stream definition does not > >>>> even > >>>>>> use > >>>>>>>> state stores, it literally looks like this: > >>>>>>>> > >>>>>>>> KStreamBuilder builder = new KStreamBuilder(); > >>>>>>>> KStream<byte[], Message> kStream = > >>>>>>>> builder.stream(appOptions.getInput().getTopic()); > >>>>>>>> kStream.process(() -> processor); > >>>>>>>> new KafkaStreams(builder, streamsConfiguration); > >>>>>>>> > >>>>>>>> The "processor" does its thing and calls "context().commit()" > >>> when > >>>>>> done. > >>>>>>>> That's it. Looking at the actual machine running the instance, > >>> the > >>>>>>> folders > >>>>>>>> under /tmp/kafka-streams/<stream name>/ only have a .lock file. > >>>>>>>> > >>>>>>>> This seems to have been bootstrapped by the exception: > >>>>>>>> > >>>>>>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit > >>>>>> cannot be > >>>>>>>> completed since the group has already rebalanced and assigned > >>> the > >>>>>>>> partitions to another member. This means that the time between > >>>>>> subsequent > >>>>>>>> calls to poll() was longer than the configured > >>>> max.poll.interval.ms, > >>>>>>> which > >>>>>>>> typically implies that the poll loop is spending too much time > >>>>> message > >>>>>>>> processing. You can address this either by increasing the > >>> session > >>>>>> timeout > >>>>>>>> or by reducing the maximum size of batches returned in poll() > >>> with > >>>>>>>> max.poll.records. > >>>>>>>> > >>>>>>>> We are addressing the latter by reducing "max.poll.records" and > >>>>>>> increasing " > >>>>>>>> commit.interval.ms", nonetheless, shouldn't Kafka Streams not > >>>> worry > >>>>>>> about > >>>>>>>> state dirs if there are no state stores? Since it doesn't seem > >>> to > >>>> do > >>>>> so > >>>>>>>> automatically, can I configured it somehow to achieve this end? > >>>>>>>> > >>>>>>>> Additionally, what could lead to it not being able to recover? > >>>>>>>> > >>>>>>>> On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax < > >>>>> matth...@confluent.io > >>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Great! :) > >>>>>>>>> > >>>>>>>>> On 5/16/17 2:31 AM, Sameer Kumar wrote: > >>>>>>>>>> I see now that my Kafka cluster is very stable, and these > >>> errors > >>>>> dont > >>>>>>>>> come > >>>>>>>>>> now. > >>>>>>>>>> > >>>>>>>>>> -Sameer. > >>>>>>>>>> > >>>>>>>>>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar < > >>>>> sam.kum.w...@gmail.com > >>>>>>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Yes, I have upgraded my cluster and client both to version > >>>> 10.2.1 > >>>>>> and > >>>>>>>>>>> currently monitoring the situation. > >>>>>>>>>>> Will report back in case I find any errors. Thanks for the > >>> help > >>>>>>> though. > >>>>>>>>>>> > >>>>>>>>>>> -Sameer. > >>>>>>>>>>> > >>>>>>>>>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax < > >>>>>>> matth...@confluent.io> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Did you see Eno's reply? > >>>>>>>>>>>> > >>>>>>>>>>>> Please try out Streams 0.10.2.1 -- this should be fixed > >>> there. > >>>> If > >>>>>>> not, > >>>>>>>>>>>> please report back. > >>>>>>>>>>>> > >>>>>>>>>>>> I would also recommend to subscribe to the list. It's > >>>>> self-service > >>>>>>>>>>>> http://kafka.apache.org/contact > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -Matthias > >>>>>>>>>>>> > >>>>>>>>>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote: > >>>>>>>>>>>>> My brokers are on version 10.1.0 and my clients are on > >>> version > >>>>>>> 10.2.0. > >>>>>>>>>>>>> Also, do a reply to all, I am currently not subscribed to > >>> the > >>>>>> list. > >>>>>>>>>>>>> > >>>>>>>>>>>>> -Sameer. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar < > >>>>>>> sam.kum.w...@gmail.com> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I ran two nodes in my streams compute cluster, they were > >>>>> running > >>>>>>> fine > >>>>>>>>>>>> for > >>>>>>>>>>>>>> few hours before outputting with failure to rebalance > >>> errors. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I couldnt understand why this happened but I saw one > >>> strange > >>>>>>>>>>>> behaviour... > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at 16:53 on node1, I saw "Failed to lock the state > >>> directory" > >>>>>>> error, > >>>>>>>>>>>> this > >>>>>>>>>>>>>> might have caused the partitions to relocate and hence the > >>>>> error. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I am attaching detailed logs for both the nodes, please > >>> see > >>>> if > >>>>>> you > >>>>>>>>> can > >>>>>>>>>>>>>> help. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Some of the logs for quick reference are these. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception > >>> caught > >>>> in > >>>>>>> thread > >>>>>>>>>>>>>> StreamThread-2 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: > >>>> stream-thread > >>>>>>>>>>>>>> [StreamThread-2] Failed to rebalance > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Caused by: > >>> org.apache.kafka.streams.errors.StreamsException: > >>>>>>>>>>>>>> stream-thread [StreamThread-2] failed to suspend stream > >>> tasks > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea > >>>>>>>>>>>> d.java:488) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java: > >>>> 69) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre > >>>>>>>>>>>> ad.java:259) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> > >>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor > >>>>>>>>>>>>>> dinator.java:396) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> > >>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract > >>>>>>>>>>>>>> Coordinator.java:329) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> > >>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract > >>>>>>>>>>>>>> Coordinator.java:303) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> > >>>>>>> > >>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> ... 1 more > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Caused by: org.apache.kafka.clients.consumer. > >>>>>> CommitFailedException: > >>>>>>>>>>>>>> Commit cannot be completed since the group has already > >>>>> rebalanced > >>>>>>> and > >>>>>>>>>>>>>> assigned the partitions to another member. This means that > >>>> the > >>>>>> time > >>>>>>>>>>>> between > >>>>>>>>>>>>>> subsequent calls to poll() was longer than the configured > >>>>>>>>>>>>>> max.poll.interval.ms, which typically implies that the > >>> poll > >>>>> loop > >>>>>>> is > >>>>>>>>>>>>>> spending too much time message processing. You can address > >>>> this > >>>>>>>>> either > >>>>>>>>>>>> by > >>>>>>>>>>>>>> increasing the session timeout or by reducing the maximum > >>>> size > >>>>> of > >>>>>>>>>>>> batches > >>>>>>>>>>>>>> returned in poll() with max.poll.records. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> > >>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co > >>>>>>>>>>>>>> nsumerCoordinator.java:698) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> > >>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer > >>>>>>>>>>>>>> Coordinator.java:577) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.clients.consu > >>>>>>>>>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>>>>>> > >>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea > >>>>>>>>>>>> d.java:480) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> ... 10 more > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2017-05-03 16:53:57 WARN StreamThread:1184 - Could not > >>>> create > >>>>>> task > >>>>>>>>>>>> 1_38. > >>>>>>>>>>>>>> Will retry. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] > >>>>>> Failed > >>>>>>> to > >>>>>>>>>>>> lock > >>>>>>>>>>>>>> the state directory: /data/streampoc/LIC2-5/1_38 > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa > >>>>>>>>>>>>>> nager.java:102) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread. > >>>>>> java:834) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr > >>>>>>>>>>>> ead.java:1207) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac > >>>>>>>>>>>>>> koff(StreamThread.java:1180) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread. > >>>>>> java:937) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread.access$500(StreamThread.java:69) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> at org.apache.kafka.streams.proce > >>>>>>>>>>>>>> > >>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr > >>>>>>>>>>>> ead.java:236) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -Sameer. > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >>> > >> > >