[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped
[ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210527#comment-16210527 ] Ted Yu commented on KAFKA-6085: --- {code} final ConsumerRecords records = pollRequests(); if (state == State.PARTITIONS_ASSIGNED) { {code} Can the if block be lifted ahead of the pollRequests() call ? When there is no active running task at the end of the if block, runOnce() can return early without calling pollRequests(). > Streams rebalancing may cause a first batch of fetched records to be dropped > > > Key: KAFKA-6085 > URL: https://issues.apache.org/jira/browse/KAFKA-6085 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 1.0.0 > > > This is a regression introduced in KAFKA-5152: > Assuming you have one task without any state stores (and hence no restoration > needed for that task), and a rebalance happened in a {{records = > pollRequests(pollTimeMs);}} call: > 1. We name this `pollRequests` call A. And within call A the rebalance will > happen, which put the thread state from RUNNING to PARTITION_REVOKED, and > then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets > assigned again, this task will be in the initialized set of tasks but NOT in > the running tasks yet. > 2. Within the same call A, a fetch request may be sent and a response with a > batch of records could be returned, and it will be returned from > `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and > the task is not "running" yet. > 3. Now the bug comes in this line: > {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}} > Since the task is not ing the active running set yet, this returned set of > records would be skipped. Effectively these records are dropped on the floor > and would never be consumed again. > 4. In the next run loop, the same `pollRequest()` will be called again. Let's > call it B. After B is called we will set the thread state to RUNNING and put > the task to the running task set. But at this point the previous batch of > records will not be returned any more. > So the bug lies in the fact that within a single run loop of the stream > thread. We may complete a rebalance with tasks assigned but not yet > initialized, AND we can fetch a bunch of records for that not-initialized > task and drop on the floor. > With further investigation I can confirm that the new flaky test > https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this > bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this > bug by failing the reset integration test more frequently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped
[ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210579#comment-16210579 ] ASF GitHub Bot commented on KAFKA-6085: --- Github user guozhangwang closed the pull request at: https://github.com/apache/kafka/pull/4086 > Streams rebalancing may cause a first batch of fetched records to be dropped > > > Key: KAFKA-6085 > URL: https://issues.apache.org/jira/browse/KAFKA-6085 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 1.0.0 > > > This is a regression introduced in KAFKA-5152: > Assuming you have one task without any state stores (and hence no restoration > needed for that task), and a rebalance happened in a {{records = > pollRequests(pollTimeMs);}} call: > 1. We name this `pollRequests` call A. And within call A the rebalance will > happen, which put the thread state from RUNNING to PARTITION_REVOKED, and > then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets > assigned again, this task will be in the initialized set of tasks but NOT in > the running tasks yet. > 2. Within the same call A, a fetch request may be sent and a response with a > batch of records could be returned, and it will be returned from > `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and > the task is not "running" yet. > 3. Now the bug comes in this line: > {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}} > Since the task is not ing the active running set yet, this returned set of > records would be skipped. Effectively these records are dropped on the floor > and would never be consumed again. > 4. In the next run loop, the same `pollRequest()` will be called again. Let's > call it B. After B is called we will set the thread state to RUNNING and put > the task to the running task set. But at this point the previous batch of > records will not be returned any more. > So the bug lies in the fact that within a single run loop of the stream > thread. We may complete a rebalance with tasks assigned but not yet > initialized, AND we can fetch a bunch of records for that not-initialized > task and drop on the floor. > With further investigation I can confirm that the new flaky test > https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this > bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this > bug by failing the reset integration test more frequently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped
[ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210580#comment-16210580 ] ASF GitHub Bot commented on KAFKA-6085: --- GitHub user guozhangwang reopened a pull request: https://github.com/apache/kafka/pull/4086 [WIP] KAFKA-6085: Pause all partitions before tasks are initialized Mirror of #4085 against trunk. This PR contains two fixes (one major and one minor): Major: on rebalance, pause all partitions instead of the partitions for tasks with state stores only, so that no records will be returned in the same `pollRecords()` call. Minor: during the restoration phase, when thread state is still PARTITION_ASSIGNED, call consumer.poll with hard-coded pollMs = 0. You can merge this pull request into a Git repository by running: $ git pull https://github.com/guozhangwang/kafka KHotfix-restore-only Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4086.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4086 commit 62bf4784779f7379e849289c4456363f352cb850 Author: Guozhang Wang Date: 2017-10-12T21:18:46Z dummy commit 5726e39cba8a79e6858e8b932c5116b60f2ae314 Author: Guozhang Wang Date: 2017-10-12T21:18:46Z dummy fix issues Remove debugging information commit 8214a3ee340791eb18f7e5fa77f2510470cf977a Author: Matthias J. Sax Date: 2017-10-17T00:38:31Z MINOR: update exception message for KIP-120 Author: Matthias J. Sax Reviewers: Guozhang Wang Closes #4078 from mjsax/hotfix-streams commit 637b76342801cf4a32c9e65aa89bfe0bf76c24a7 Author: Jason Gustafson Date: 2017-10-17T00:49:35Z MINOR: A few javadoc fixes Author: Jason Gustafson Reviewers: Guozhang Wang Closes #4076 from hachikuji/javadoc-fixes commit f57c505f6e714b891a6d30e5501b463f14316708 Author: Damian Guy Date: 2017-10-17T01:01:32Z MINOR: add equals to SessionWindows Author: Damian Guy Reviewers: Guozhang Wang , Matthias J. Sax, Bill Bejeck Closes #4074 from dguy/minor-session-window-equals commit 2f1dd0d4da24eee352f20436902d825d7851c45b Author: Guozhang Wang Date: 2017-10-18T01:27:35Z normal poll with zero during restoration commit 043f28ac89b50f9145ac719449f03a427376dcde Author: Guozhang Wang Date: 2017-10-19T04:58:36Z recheck state change > Streams rebalancing may cause a first batch of fetched records to be dropped > > > Key: KAFKA-6085 > URL: https://issues.apache.org/jira/browse/KAFKA-6085 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 1.0.0 > > > This is a regression introduced in KAFKA-5152: > Assuming you have one task without any state stores (and hence no restoration > needed for that task), and a rebalance happened in a {{records = > pollRequests(pollTimeMs);}} call: > 1. We name this `pollRequests` call A. And within call A the rebalance will > happen, which put the thread state from RUNNING to PARTITION_REVOKED, and > then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets > assigned again, this task will be in the initialized set of tasks but NOT in > the running tasks yet. > 2. Within the same call A, a fetch request may be sent and a response with a > batch of records could be returned, and it will be returned from > `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and > the task is not "running" yet. > 3. Now the bug comes in this line: > {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}} > Since the task is not ing the active running set yet, this returned set of > records would be skipped. Effectively these records are dropped on the floor > and would never be consumed again. > 4. In the next run loop, the same `pollRequest()` will be called again. Let's > call it B. After B is called we will set the thread state to RUNNING and put > the task to the running task set. But at this point the previous batch of > records will not be returned any more. > So the bug lies in the fact that within a single run loop of the stream > thread. We may complete a rebalance with tasks assigned but not yet > initialized, AND we can fetch a bunch of records for that not-initialized > task and drop on the floor. > With further investigation I can confirm that the new flaky test > https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this > bug. And a recent PR https://github.com/apache/kaf
[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped
[ https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16210581#comment-16210581 ] ASF GitHub Bot commented on KAFKA-6085: --- Github user guozhangwang closed the pull request at: https://github.com/apache/kafka/pull/4086 > Streams rebalancing may cause a first batch of fetched records to be dropped > > > Key: KAFKA-6085 > URL: https://issues.apache.org/jira/browse/KAFKA-6085 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Blocker > Fix For: 1.0.0 > > > This is a regression introduced in KAFKA-5152: > Assuming you have one task without any state stores (and hence no restoration > needed for that task), and a rebalance happened in a {{records = > pollRequests(pollTimeMs);}} call: > 1. We name this `pollRequests` call A. And within call A the rebalance will > happen, which put the thread state from RUNNING to PARTITION_REVOKED, and > then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets > assigned again, this task will be in the initialized set of tasks but NOT in > the running tasks yet. > 2. Within the same call A, a fetch request may be sent and a response with a > batch of records could be returned, and it will be returned from > `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and > the task is not "running" yet. > 3. Now the bug comes in this line: > {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}} > Since the task is not ing the active running set yet, this returned set of > records would be skipped. Effectively these records are dropped on the floor > and would never be consumed again. > 4. In the next run loop, the same `pollRequest()` will be called again. Let's > call it B. After B is called we will set the thread state to RUNNING and put > the task to the running task set. But at this point the previous batch of > records will not be returned any more. > So the bug lies in the fact that within a single run loop of the stream > thread. We may complete a rebalance with tasks assigned but not yet > initialized, AND we can fetch a bunch of records for that not-initialized > task and drop on the floor. > With further investigation I can confirm that the new flaky test > https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this > bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this > bug by failing the reset integration test more frequently. -- This message was sent by Atlassian JIRA (v6.4.14#64029)