[
https://issues.apache.org/jira/browse/KAFKA-4593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988200#comment-15988200
]
Matthias J. Sax edited comment on KAFKA-4593 at 4/28/17 4:57 AM:
-----------------------------------------------------------------
It is possible if A and B are on different machines. And re-reading both JIRAs,
this is a different one than the other. I guess it's a rare scenario but
possible.
was (Author: mjsax):
It is possible of A and B are on different machines. And re-reading both JIRAs,
this is a different one than the other. I guess it's a rare scenario but
possible.
> Task migration during rebalance callback process could lead the obsoleted
> task's IllegalStateException
> ------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-4593
> URL: https://issues.apache.org/jira/browse/KAFKA-4593
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Guozhang Wang
> Labels: infrastructure
>
> 1. Assume 2 running threads A and B, and one task t1 jut for simplicity.
> 2. First rebalance is triggered, task t1 is assigned to A (B has no assigned
> task).
> 3. During the first rebalance callback, task t1's state store need to be
> restored on thread A, and this is called in "restoreActiveState" of
> "createStreamTask".
> 4. Not suppose thread A has a long GC causing it to stall, a second rebalance
> then will be triggered and kicked A out of the group; B gets the task t1 and
> did the same restoration process, after the process thread B continues to
> process data and update the state store, while at the same time writes more
> messages to the changelog (so its log end offset has incremented).
> 5. After a while A resumes from the long GC, not knowing it has actually be
> kicked out of the group and task t1 is no longer owned to itself, it
> continues the restoration process but then realize that the log end offset
> has advanced. When this happens, we will see the following exception on
> thread A:
> {code}
> java.lang.IllegalStateException: task XXX Log end offset of
> YYY-table_stream-changelog-ZZ should not change while
> restoring: old end offset .., current offset ..
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:248)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:122)
> at
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:200)
> at
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:65)
> at
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:65)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:794)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1222)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1195)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:897)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:71)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:240)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:230)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:314)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:278)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1039)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1004)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:570)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:359)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)