[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210581#comment-16210581
 ] 

ASF GitHub Bot commented on KAFKA-6085:
---

Github user guozhangwang closed the pull request at:

https://github.com/apache/kafka/pull/4086


> Streams rebalancing may cause a first batch of fetched records to be dropped
> 
>
> Key: KAFKA-6085
> URL: https://issues.apache.org/jira/browse/KAFKA-6085
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 1.0.0
>
>
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration 
> needed for that task), and a rebalance happened in a {{records = 
> pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will 
> happen, which put the thread state from RUNNING to PARTITION_REVOKED, and 
> then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets 
> assigned again, this task will be in the initialized set of tasks but NOT in 
> the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a 
> batch of records could be returned, and it will be returned from 
> `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and 
> the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of 
> records would be skipped. Effectively these records are dropped on the floor 
> and would never be consumed again.
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's 
> call it B. After B is called we will set the thread state to RUNNING and put 
> the task to the running task set. But at this point the previous batch of 
> records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream 
> thread. We may complete a rebalance with tasks assigned but not yet 
> initialized, AND we can fetch a bunch of records for that not-initialized 
> task and drop on the floor.
> With further investigation I can confirm that the new flaky test 
> https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this 
> bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this 
> bug by failing the reset integration test more frequently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210580#comment-16210580
 ] 

ASF GitHub Bot commented on KAFKA-6085:
---

GitHub user guozhangwang reopened a pull request:

https://github.com/apache/kafka/pull/4086

[WIP] KAFKA-6085: Pause all partitions before tasks are initialized

Mirror of #4085 against trunk. This PR contains two fixes (one major and 
one minor):

Major: on rebalance, pause all partitions instead of the partitions for 
tasks with state stores only, so that no records will be returned in the same 
`pollRecords()` call.

Minor: during the restoration phase, when thread state is still 
PARTITION_ASSIGNED, call consumer.poll with hard-coded pollMs = 0.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/guozhangwang/kafka KHotfix-restore-only

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4086.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4086


commit 62bf4784779f7379e849289c4456363f352cb850
Author: Guozhang Wang 
Date:   2017-10-12T21:18:46Z

dummy

commit 5726e39cba8a79e6858e8b932c5116b60f2ae314
Author: Guozhang Wang 
Date:   2017-10-12T21:18:46Z

dummy

fix issues

Remove debugging information

commit 8214a3ee340791eb18f7e5fa77f2510470cf977a
Author: Matthias J. Sax 
Date:   2017-10-17T00:38:31Z

MINOR: update exception message for KIP-120

Author: Matthias J. Sax 

Reviewers: Guozhang Wang 

Closes #4078 from mjsax/hotfix-streams

commit 637b76342801cf4a32c9e65aa89bfe0bf76c24a7
Author: Jason Gustafson 
Date:   2017-10-17T00:49:35Z

MINOR: A few javadoc fixes

Author: Jason Gustafson 

Reviewers: Guozhang Wang 

Closes #4076 from hachikuji/javadoc-fixes

commit f57c505f6e714b891a6d30e5501b463f14316708
Author: Damian Guy 
Date:   2017-10-17T01:01:32Z

MINOR: add equals to SessionWindows

Author: Damian Guy 

Reviewers: Guozhang Wang , Matthias J. 
Sax, Bill Bejeck 

Closes #4074 from dguy/minor-session-window-equals

commit 2f1dd0d4da24eee352f20436902d825d7851c45b
Author: Guozhang Wang 
Date:   2017-10-18T01:27:35Z

normal poll with zero during restoration

commit 043f28ac89b50f9145ac719449f03a427376dcde
Author: Guozhang Wang 
Date:   2017-10-19T04:58:36Z

recheck state change




> Streams rebalancing may cause a first batch of fetched records to be dropped
> 
>
> Key: KAFKA-6085
> URL: https://issues.apache.org/jira/browse/KAFKA-6085
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 1.0.0
>
>
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration 
> needed for that task), and a rebalance happened in a {{records = 
> pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will 
> happen, which put the thread state from RUNNING to PARTITION_REVOKED, and 
> then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets 
> assigned again, this task will be in the initialized set of tasks but NOT in 
> the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a 
> batch of records could be returned, and it will be returned from 
> `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and 
> the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of 
> records would be skipped. Effectively these records are dropped on the floor 
> and would never be consumed again.
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's 
> call it B. After B is called we will set the thread state to RUNNING and put 
> the task to the running task set. But at this point the previous batch of 
> records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream 
> thread. We may complete a rebalance with tasks assigned but not yet 
> 

[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped

2017-10-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210579#comment-16210579
 ] 

ASF GitHub Bot commented on KAFKA-6085:
---

Github user guozhangwang closed the pull request at:

https://github.com/apache/kafka/pull/4086


> Streams rebalancing may cause a first batch of fetched records to be dropped
> 
>
> Key: KAFKA-6085
> URL: https://issues.apache.org/jira/browse/KAFKA-6085
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 1.0.0
>
>
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration 
> needed for that task), and a rebalance happened in a {{records = 
> pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will 
> happen, which put the thread state from RUNNING to PARTITION_REVOKED, and 
> then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets 
> assigned again, this task will be in the initialized set of tasks but NOT in 
> the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a 
> batch of records could be returned, and it will be returned from 
> `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and 
> the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of 
> records would be skipped. Effectively these records are dropped on the floor 
> and would never be consumed again.
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's 
> call it B. After B is called we will set the thread state to RUNNING and put 
> the task to the running task set. But at this point the previous batch of 
> records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream 
> thread. We may complete a rebalance with tasks assigned but not yet 
> initialized, AND we can fetch a bunch of records for that not-initialized 
> task and drop on the floor.
> With further investigation I can confirm that the new flaky test 
> https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this 
> bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this 
> bug by failing the reset integration test more frequently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6085) Streams rebalancing may cause a first batch of fetched records to be dropped

2017-10-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16210527#comment-16210527
 ] 

Ted Yu commented on KAFKA-6085:
---

{code}
final ConsumerRecords records = pollRequests();

if (state == State.PARTITIONS_ASSIGNED) {
{code}
Can the if block be lifted ahead of the pollRequests() call ?
When there is no active running task at the end of the if block, runOnce() can 
return early without calling pollRequests().

> Streams rebalancing may cause a first batch of fetched records to be dropped
> 
>
> Key: KAFKA-6085
> URL: https://issues.apache.org/jira/browse/KAFKA-6085
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Blocker
> Fix For: 1.0.0
>
>
> This is a regression introduced in KAFKA-5152:
> Assuming you have one task without any state stores (and hence no restoration 
> needed for that task), and a rebalance happened in a {{records = 
> pollRequests(pollTimeMs);}} call:
> 1. We name this `pollRequests` call A. And within call A the rebalance will 
> happen, which put the thread state from RUNNING to PARTITION_REVOKED, and 
> then from PARITION_REVOKED to PARTITION_ASSIGNED. Assume the same task gets 
> assigned again, this task will be in the initialized set of tasks but NOT in 
> the running tasks yet.
> 2. Within the same call A, a fetch request may be sent and a response with a 
> batch of records could be returned, and it will be returned from 
> `pollRequests`. At this time the thread state become PARTITION_ASSIGNED and 
> the task is not "running" yet.
> 3. Now the bug comes in this line:
> {{!records.isEmpty() && taskManager.hasActiveRunningTasks()}}
> Since the task is not ing the active running set yet, this returned set of 
> records would be skipped. Effectively these records are dropped on the floor 
> and would never be consumed again.
> 4. In the next run loop, the same `pollRequest()` will be called again. Let's 
> call it B. After B is called we will set the thread state to RUNNING and put 
> the task to the running task set. But at this point the previous batch of 
> records will not be returned any more.
> So the bug lies in the fact that within a single run loop of the stream 
> thread. We may complete a rebalance with tasks assigned but not yet 
> initialized, AND we can fetch a bunch of records for that not-initialized 
> task and drop on the floor.
> With further investigation I can confirm that the new flaky test 
> https://issues.apache.org/jira/browse/KAFKA-5140 's root cause is also this 
> bug. And a recent PR https://github.com/apache/kafka/pull/4086 exposed this 
> bug by failing the reset integration test more frequently.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)