[ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210579#comment-16210579 ]
ASF GitHub Bot commented on KAFKA-6085: --------------------------------------- Github user guozhangwang closed the pull request at: https://github.com/apache/kafka/pull/4086 > Streams rebalancing may cause a first batch of fetched records to be dropped > ---------------------------------------------------------------------------- > > Key: KAFKA-6085 > URL: https://issues.apache.org/jira/browse/KAFKA-6085 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.11.0.1 > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Blocker > Fix For: 1.0.0 > > > This is a regression introduced in KAFKA-5152: > Assuming you have one task without any state stores (and hence no restoration > needed for that task), and a rebalance happened in a {{records = > pollRequests(pollTimeMs);}} call: > 1. We name this `pollRequests` call A. And within call A the rebalance will > happen, which put the thread state from RUNNING to PARTITION_REVOKED, and > then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets > assigned again, this task will be in the initialized set of tasks but NOT in > the running tasks yet. > 2. Within the same call A, a fetch request may be sent and a response with a > batch of records could be returned, and it will be returned from > `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and > the task is not "running" yet. > 3. Now the bug comes in this line: > {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}} > Since the task is not ing the active running set yet, this returned set of > records would be skipped. Effectively these records are dropped on the floor > and would never be consumed again. > 4. In the next run loop, the same `pollRequest()` will be called again. Let's > call it B. After B is called we will set the thread state to RUNNING and put > the task to the running task set. But at this point the previous batch of > records will not be returned any more. > So the bug lies in the fact that within a single run loop of the stream > thread. We may complete a rebalance with tasks assigned but not yet > initialized, AND we can fetch a bunch of records for that not-initialized > task and drop on the floor. > With further investigation I can confirm that the new flaky test > https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this > bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this > bug by failing the reset integration test more frequently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)