[jira] [Created] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
Luke Chen created KAFKA-12495: - Summary: Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor Key: KAFKA-12495 URL: https://issues.apache.org/jira/browse/KAFKA-12495 Project: Kafka Issue Type: Bug Reporter: Luke Chen Assignee: Luke Chen Attachments: image-2021-03-18-15-04-57-854.png, image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png In Kafka Connect, we implement incremental cooperative rebalance algorithm based on [KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].] However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303902#comment-17303902 ] Luke Chen commented on KAFKA-12283: --- The root cause for this flaky test is described in this ticket: KAFKA-12495. I'll apply workaround in this flaky test first to make the tests reliable. And then, refactor and add more tests in KAFKA-12495. Thanks. > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Chia-Ping Tsai >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12495: -- Description: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on [KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. was: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on [KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].] However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on > [KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of rebla
[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12495: -- Description: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. was: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15
[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12495: -- Description: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. was: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on [KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incrementa
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303903#comment-17303903 ] rameshkrishnan muthusamy commented on KAFKA-12495: -- [~showuon] this is being addressed in https://issues.apache.org/jira/browse/KAFKA-10413 , let me know if you still see any issue with this patch as well > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W3 left after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > // W3 is down > W3 doesn't join > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > which cause unbalanced distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) > {code} > We cannot assume the member count after keeps the same right after revocation. > > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303906#comment-17303906 ] Luke Chen commented on KAFKA-12495: --- [~ramkrish1489], thanks for your comments. However, this issue is still there after this patch. As I described, it's that the algorithm didn't consider the member count not the same as previous revocation round. It's not related to the issue in KAFKA-10413. Or we can say, you found 2 issues in KAFKA-10413, and I found one more issue in this ticket, which all will cause uneven distribution. Please help review my PR after I completed it. Thank you. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W3 left after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > // W3 is down > W3 doesn't join > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > which cause unbalanced distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) > {code} > We cannot assume the member count after keeps the same right after revocation. > > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303906#comment-17303906 ] Luke Chen edited comment on KAFKA-12495 at 3/18/21, 7:36 AM: - [~ramkrish1489], thanks for your comments. However, this issue is still there after this patch. As I described, it's that the algorithm didn't consider the member count not the same as previous revocation round. We just evenly distributed the revoked C/T to all the new members. We should consider the member count before doing this. It's not related to the issue in KAFKA-10413. Or we can say, you found 2 issues in KAFKA-10413, and I found one more issue in this ticket, which all will cause uneven distribution. Please help review my PR after I completed it. Thank you. was (Author: showuon): [~ramkrish1489], thanks for your comments. However, this issue is still there after this patch. As I described, it's that the algorithm didn't consider the member count not the same as previous revocation round. It's not related to the issue in KAFKA-10413. Or we can say, you found 2 issues in KAFKA-10413, and I found one more issue in this ticket, which all will cause uneven distribution. Please help review my PR after I completed it. Thank you. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W3 left after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > // W3 is down > W3 doesn't join > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > which cause unbalanced distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) > {code} > We cannot assume the member count after keeps the same right after revocation. > > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10304: KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
chia7712 commented on pull request #10304: URL: https://github.com/apache/kafka/pull/10304#issuecomment-801702252 > LGTM. @chia7712 If you are fine with the PR, I let you merge it. sure. will merge it after I take a final review :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10304: KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
chia7712 merged pull request #10304: URL: https://github.com/apache/kafka/pull/10304 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12454) Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster
[ https://issues.apache.org/jira/browse/KAFKA-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12454. Resolution: Fixed > Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in > current kafka cluster > --- > > Key: KAFKA-12454 > URL: https://issues.apache.org/jira/browse/KAFKA-12454 > Project: Kafka > Issue Type: Improvement >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Minor > Fix For: 3.0.0 > > > When non-existent brokerIds value are given, the kafka-log-dirs tool will > have a timeout error: > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node > assignment. Call: describeLogDirs > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50) > at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36) > at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala) > Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting > for a node assignment. Call: describeLogDirs > > When the brokerId entered by the user does not exist, an error message > indicating that the node is not present should be printed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] unlizhao opened a new pull request #10348: KafkaAdminClient check null for group.groupState()
unlizhao opened a new pull request #10348: URL: https://github.com/apache/kafka/pull/10348 1. when group.groupState() == null , the original writing method will throw a java.lang.NullPointerException, so the following is judged group.groupState 2. the caller of the equal method is flipped. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10348: MINOR: KafkaAdminClient check null for group.groupState()
dajac commented on pull request #10348: URL: https://github.com/apache/kafka/pull/10348#issuecomment-801792638 @unlizhao Thanks for the PR. I wonder if `group.groupState()` could really be `null` as it seems to be empty string by default. Have you encountered a case where it happened? If so, it might be worth filling a bug for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup
[ https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey updated KAFKA-12390: --- Attachment: (was: negative lag kafka.png) > error when storing group assignment during SyncGroup > - > > Key: KAFKA-12390 > URL: https://issues.apache.org/jira/browse/KAFKA-12390 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 2.4.0 >Reporter: Andrey >Priority: Major > > There is a cluster consisting of 3 nodes > Apache Kafka Cluster 2.4.0 > messages were observed in the logs of one of the nodes: > Preparing to rebalance group corecft-adapter2 in state PreparingRebalance > with old generation 5043 (__consumer_offsets-22) (reason: error when storing > group assignment during SyncGroup (member: > consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. > coordinator.group.GroupCoordinator) > Messages from the topic were read, but no commit occurred. then there was > balancing and the data in the topics was "doubled". There was also a > "negative" lag > there are no error messages in the zookeeper logs > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup
[ https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey updated KAFKA-12390: --- Attachment: negative lag kafka.png > error when storing group assignment during SyncGroup > - > > Key: KAFKA-12390 > URL: https://issues.apache.org/jira/browse/KAFKA-12390 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 2.4.0 >Reporter: Andrey >Priority: Major > Attachments: negative lag kafka.png > > > There is a cluster consisting of 3 nodes > Apache Kafka Cluster 2.4.0 > messages were observed in the logs of one of the nodes: > Preparing to rebalance group corecft-adapter2 in state PreparingRebalance > with old generation 5043 (__consumer_offsets-22) (reason: error when storing > group assignment during SyncGroup (member: > consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. > coordinator.group.GroupCoordinator) > Messages from the topic were read, but no commit occurred. then there was > balancing and the data in the topics was "doubled". There was also a > "negative" lag > there are no error messages in the zookeeper logs > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup
[ https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey updated KAFKA-12390: --- Attachment: negative lag kafka.png > error when storing group assignment during SyncGroup > - > > Key: KAFKA-12390 > URL: https://issues.apache.org/jira/browse/KAFKA-12390 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 2.4.0 >Reporter: Andrey >Priority: Major > Attachments: negative lag kafka.png > > > There is a cluster consisting of 3 nodes > Apache Kafka Cluster 2.4.0 > messages were observed in the logs of one of the nodes: > Preparing to rebalance group corecft-adapter2 in state PreparingRebalance > with old generation 5043 (__consumer_offsets-22) (reason: error when storing > group assignment during SyncGroup (member: > consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. > coordinator.group.GroupCoordinator) > Messages from the topic were read, but no commit occurred. then there was > balancing and the data in the topics was "doubled". There was also a > "negative" lag > there are no error messages in the zookeeper logs > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup
[ https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey updated KAFKA-12390: --- Attachment: (was: negative lag kafka.png) > error when storing group assignment during SyncGroup > - > > Key: KAFKA-12390 > URL: https://issues.apache.org/jira/browse/KAFKA-12390 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 2.4.0 >Reporter: Andrey >Priority: Major > Attachments: negative lag kafka.png > > > There is a cluster consisting of 3 nodes > Apache Kafka Cluster 2.4.0 > messages were observed in the logs of one of the nodes: > Preparing to rebalance group corecft-adapter2 in state PreparingRebalance > with old generation 5043 (__consumer_offsets-22) (reason: error when storing > group assignment during SyncGroup (member: > consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. > coordinator.group.GroupCoordinator) > Messages from the topic were read, but no commit occurred. then there was > balancing and the data in the topics was "doubled". There was also a > "negative" lag > there are no error messages in the zookeeper logs > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
cadonna commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596720028 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); -return true; -} else if (lockAndOwner != null) { -// another thread owns the lock -return false; -} - -try { -lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); -} catch (final ProcessorStateException e) { -// directoryForTask could be throwing an exception if another thread -// has concurrently deleted the directory -return false; -} - -final FileChannel channel; - -try { -channel = getOrCreateFileChannel(taskId, lockFile.toPath()); -} catch (final NoSuchFileException e) { -// FileChannel.open(..) could throw NoSuchFileException when there is another thread -// concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock -// file, in this case we will return immediately indicating locking failed. +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { +log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +// we already own the lock +return true; +} else { +// another thread owns the lock +return false; +} +} else if (!stateDir.exists()) { Review comment: I am wondering if we should throw an `IllegalStateException` here, because it seems illegal to me to request a lock of a task directory in a state directory that does not exist. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { exception ); } finally { -try { -unlock(id); -} catch (final IOException exception) { -log.warn( -String.format("%s Swallowed the following exception during unlocking after deletion of obsolete " + -"state directory %s for task %s:", logPrefix(), dirName, id), -exception -); -} +unlock(id); Review comment: I guess you could make the same change on line 474 in `cleanRemovedTasksCalledByUser()`, couldn't you? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java ## @@ -240,32 +192,30 @@ public void testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOEx } @Test -public void testCloseStateManagerThrowsExceptionWhenDirty() throws IOException { +public void testCloseStateManagerThrowsExceptionWhenDirty() { expect(stateManager.taskId()).andReturn(taskId); expect(stateDirectory.lock(taskId)).andReturn(true); stateManager.close(); -expectLastCall(); +expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); stateDirectory.unlock(taskId); -expectLastCall().andThrow(new IOException("Timeout")); +expectLastCall(); Review comment: You can probably remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10347: MINOR: the request thread in clientToControllerChannelManager is NOT…
dengziming commented on pull request #10347: URL: https://github.com/apache/kafka/pull/10347#issuecomment-801824522 It seems that this issue has been fixed by #10340. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12496) Javadoc webpages should not be published to mirrors
Sebb created KAFKA-12496: Summary: Javadoc webpages should not be published to mirrors Key: KAFKA-12496 URL: https://issues.apache.org/jira/browse/KAFKA-12496 Project: Kafka Issue Type: Task Reporter: Sebb As per discussion on the dev mailing list, please would someone remove the javadocs from the mirrors, i.e. svn rm https://dist.apache.org/repos/dist/release/kafka/2.6.1/javadoc/ and svn rm https://dist.apache.org/repos/dist/release/kafka/2.7.0/javadoc/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10347: MINOR: the request thread in clientToControllerChannelManager is NOT…
chia7712 commented on pull request #10347: URL: https://github.com/apache/kafka/pull/10347#issuecomment-801827853 > It seems that this issue has been fixed by #10340. You are right :) close this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 closed pull request #10347: MINOR: the request thread in clientToControllerChannelManager is NOT…
chia7712 closed pull request #10347: URL: https://github.com/apache/kafka/pull/10347 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9779: KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache
cadonna commented on a change in pull request #9779: URL: https://github.com/apache/kafka/pull/9779#discussion_r596741592 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ## @@ -252,20 +262,43 @@ public void shouldGetSameKeyAsPeekNext() { assertEquals(iterator.peekNextKey(), iterator.next().key); } +@Test +public void shouldGetSameKeyAsPeekNextReverseRange() { +final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); +final Bytes theByte = Bytes.wrap(new byte[]{1}); +cache.put(namespace, theByte, dirtyEntry(theByte.get())); +final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte); +assertEquals(iterator.peekNextKey(), iterator.next().key); Review comment: nit: see my comment about `assertThat()` above. ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ## @@ -252,20 +262,43 @@ public void shouldGetSameKeyAsPeekNext() { assertEquals(iterator.peekNextKey(), iterator.next().key); } +@Test +public void shouldGetSameKeyAsPeekNextReverseRange() { +final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); +final Bytes theByte = Bytes.wrap(new byte[]{1}); +cache.put(namespace, theByte, dirtyEntry(theByte.get())); +final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte); +assertEquals(iterator.peekNextKey(), iterator.next().key); +} + @Test public void shouldThrowIfNoPeekNextKey() { final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); assertThrows(NoSuchElementException.class, iterator::peekNextKey); } +@Test +public void shouldThrowIfNoPeekNextKeyReverseRange() { +final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); +final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); +assertThrows(NoSuchElementException.class, iterator::peekNextKey); +} + Review comment: nit: could you try to deduplicate code here and in the other unit tests? Here for example, you could have one method like this: ``` private void shouldThrowIfNoPeekNextKey(final Supplier methodUnderTest) { final ThreadCache.MemoryLRUCacheBytesIterator iterator = methodUnderTest.get(); assertThrows(NoSuchElementException.class, iterator::peekNextKey); } ``` and then two public tests ``` @Test public void shouldThrowIfNoPeekNextKeyRange() { final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); shouldThrowIfNoPeekNextKey(() -> cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}))); } @Test public void shouldThrowIfNoPeekNextKeyReverseRange() { final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); shouldThrowIfNoPeekNextKey(() -> cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}))); } ``` Admittedly, in this specific case, we would not win much but for other unit tests in this test class it may be worth. Try and then decide if it is worth or not. ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java ## @@ -243,6 +243,16 @@ public void shouldPeekNextKey() { assertEquals(theByte, iterator.peekNextKey()); } +@Test +public void shouldPeekNextKeyReverseRange() { +final ThreadCache cache = new ThreadCache(logContext, 1L, new MockStreamsMetrics(new Metrics())); +final Bytes theByte = Bytes.wrap(new byte[]{1}); +cache.put(namespace, theByte, dirtyEntry(theByte.get())); +final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte); +assertEquals(theByte, iterator.peekNextKey()); +assertEquals(theByte, iterator.peekNextKey()); Review comment: nit: Could you please use `assertThat(iterator.peekNextKey(), is(the Byte))` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment.
[jira] [Commented] (KAFKA-12384) Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
[ https://issues.apache.org/jira/browse/KAFKA-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304053#comment-17304053 ] Bruno Cadonna commented on KAFKA-12384: --- Failed again https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10317/2/testReport/junit/kafka.server/ListOffsetsRequestTest/Build___JDK_15___testResponseIncludesLeaderEpoch___2/ {code} org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: <(-1,-1)> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172) {code} > Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch > - > > Key: KAFKA-12384 > URL: https://issues.apache.org/jira/browse/KAFKA-12384 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > {quote}org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: > <(-1,-1)> at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at > org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at > kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API
dajac commented on a change in pull request #10275: URL: https://github.com/apache/kafka/pull/10275#discussion_r596869352 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; + +import java.util.Collections; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +public interface AdminApiLookupStrategy { + +/** + * Define the scope of a given key for lookup. Key lookups are complicated + * by the need to accommodate different batching mechanics. For example, + * a `Metadata` request supports arbitrary batching of topic partitions in + * order to discover partitions leaders. This can be supported by returning + * a single scope object for all keys. + * + * On the other hand, `FindCoordinator` request only supports lookup of a + * single key. This can be supported by returning a different scope object + * for each lookup key. + * + * @param key the lookup key + * + * @return request scope indicating how lookup requests can be batched together + */ +RequestScope lookupScope(T key); + +/** + * Build the lookup request for a set of keys. The grouping of the keys is controlled + * through {@link #lookupScope(Object)}. In other words, each set of keys that map + * to the same request scope object will be sent to this method. + * + * @param keys the set of keys that require lookup + * + * @return a builder for the lookup request + */ +AbstractRequest.Builder buildRequest(Set keys); + +/** + * Callback that is invoked when a lookup request returns successfully. The handler + * should parse the response, check for errors, and return a result indicating + * which keys were mapped to a brokerId successfully and which keys received + * a fatal error (e.g. a topic authorization failure). + * + * Note that keys which receive a retriable error should be left out of the + * result. They will be retried automatically. For example, if the response of + * `FindCoordinator` request indicates an unavailable coordinator, then the key + * should be left out of the result so that the request will be retried. + * + * @param keys the set of keys from the associated request + * @param response the response received from the broker + * + * @return a result indicating which keys mapped successfully to a brokerId and + * which encountered a fatal error + */ +LookupResult handleResponse(Set keys, AbstractResponse response); + +class LookupResult { +public final Map mappedKeys; +public final Map failedKeys; + +public LookupResult( +Map failedKeys, +Map mappedKeys +) { +this.failedKeys = Collections.unmodifiableMap(failedKeys); +this.mappedKeys = Collections.unmodifiableMap(mappedKeys); +} +} + +interface RequestScope { +default OptionalInt destinationBrokerId() { +return OptionalInt.empty(); +} +} Review comment: Have you decided to put this interface here because it is mainly used by the `AdminApiLookupStrategy`? Intuitively, I would have placed it in the driver because it is used by few other classes as well. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java ## @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unles
[GitHub] [kafka] ijuma commented on a change in pull request #10344: MINOR: Remove use of NoSuchElementException
ijuma commented on a change in pull request #10344: URL: https://github.com/apache/kafka/pull/10344#discussion_r596895503 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -37,8 +37,9 @@ final class KafkaMetadataLog private ( log: Log, scheduler: Scheduler, // This object needs to be thread-safe because it is used by the snapshotting thread to notify the - // polling thread when snapshots are created. - snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch], + // polling thread when snapshots are created. Using a Map instead of a Set so that there is no + // need to handle NoSuchElementException. + snapshotIds: ConcurrentSkipListMap[OffsetAndEpoch, Unit], Review comment: @jsancio Iterators need to provide consistent behavior or they would not satisfy the Iterator contract. What I mean is: if `hasNext` returns `true`, then `next` has to return an element. It would not be ok for `next` to do something different in that case. So, it seems safe to use here. In any case, not a big deal to use `ConcurrentNavigableMap` since that's what `ConcurrentSkipListSet` uses too. But if we do that, I would suggest creating a simple wrapper that exposes the methods you want. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10341: KAFKA-12491: Make rocksdb an `api` dependency for `streams`
ijuma merged pull request #10341: URL: https://github.com/apache/kafka/pull/10341 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10349: MINOR: Move `configurations.all` to be a child of `allprojects`
ijuma opened a new pull request #10349: URL: https://github.com/apache/kafka/pull/10349 It was incorrectly set within `dependencyUpdates` and it still worked. That is, this is a no-op in terms of behavior, but makes it easier to read and understand. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding
ijuma commented on pull request #10340: URL: https://github.com/apache/kafka/pull/10340#issuecomment-801967783 Is this a 2.8 blocker? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] unlizhao commented on pull request #10348: MINOR: KafkaAdminClient check null for group.groupState()
unlizhao commented on pull request #10348: URL: https://github.com/apache/kafka/pull/10348#issuecomment-801972726 @dajac Normally, this value(`group.groupState()`) is the network request processed by the customer in the cluster when sending the consumergroups message. Normally, the response will carry this value. If this value is not set, `group.groupStat ()` will be a null. This way of writing `group.groupState().Equals (‘’)` will report null pointer exception. You can try to do this and you'll see this.` org.apache.kafka . clients.admin.KafkaAdminClientTest#testListConsumerGroupsWithStates` `Change to setgroupstate ("stable") ==> setgroupstate (null)` However, according to the normal design, the `response` without `groupstate` itself is illegal, and it is reasonable not to handle this response. Even if NPE occurs, this method is in the outer try catch and will not cause system crash. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
ijuma merged pull request #10056: URL: https://github.com/apache/kafka/pull/10056 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] unlizhao edited a comment on pull request #10348: MINOR: KafkaAdminClient check null for group.groupState()
unlizhao edited a comment on pull request #10348: URL: https://github.com/apache/kafka/pull/10348#issuecomment-801972726 @dajac Thanks for your reviewed. Normally, this value(`group.groupState()`) is the network request processed by the customer in the cluster when sending the consumergroups message. Normally, the response will carry this value. If this value is not set, `group.groupStat ()` will be a null. This way of writing `group.groupState().Equals (‘’)` will report null pointer exception. You can try to do this and you'll see this.` org.apache.kafka . clients.admin.KafkaAdminClientTest#testListConsumerGroupsWithStates` `Change to setgroupstate ("stable") ==> setgroupstate (null)` However, according to the normal design, the `response` without `groupstate` itself is illegal, and it is reasonable not to handle this response. Even if NPE occurs, this method is in the outer try catch and will not cause system crash. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
ijuma commented on pull request #10056: URL: https://github.com/apache/kafka/pull/10056#issuecomment-801976406 I tried backporting to 2.8 and the older plugin versions in that branch were not available in Maven central. We should probably wait a few months and see how things develop before backporting to older branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)
ijuma edited a comment on pull request #10056: URL: https://github.com/apache/kafka/pull/10056#issuecomment-801976406 I tried backporting to 2.8 and the older Gradle plugin versions in that branch were not available in Maven central. We should probably wait a few months and see how things develop before backporting to older branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding
mumrah commented on pull request #10340: URL: https://github.com/apache/kafka/pull/10340#issuecomment-801992943 @ijuma no this only affects trunk. It looks like the bug was introduced only a few days ago -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding
ijuma commented on pull request #10340: URL: https://github.com/apache/kafka/pull/10340#issuecomment-801997350 Thanks @mumrah. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10344: MINOR: Remove use of NoSuchElementException
jsancio commented on a change in pull request #10344: URL: https://github.com/apache/kafka/pull/10344#discussion_r596980629 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -37,8 +37,9 @@ final class KafkaMetadataLog private ( log: Log, scheduler: Scheduler, // This object needs to be thread-safe because it is used by the snapshotting thread to notify the - // polling thread when snapshots are created. - snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch], + // polling thread when snapshots are created. Using a Map instead of a Set so that there is no + // need to handle NoSuchElementException. + snapshotIds: ConcurrentSkipListMap[OffsetAndEpoch, Unit], Review comment: Sounds good @ijuma . I changed the code to use a `ConcurrentSkipListSet`, ascending iterator and descending iterator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-3813. -- Resolution: Done > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #10314: MINOR: remove redundant null check when testing specified type
chia7712 merged pull request #10314: URL: https://github.com/apache/kafka/pull/10314 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
jsancio commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r596994002 ## File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java ## @@ -16,40 +16,56 @@ */ package org.apache.kafka.raft; +import java.util.Objects; + public final class ValidOffsetAndEpoch { -final private Type type; +final private Kind kind; Review comment: Yeah. What makes the suggested design pattern nice in Scala is that the companion objects for `case class` have an `unapply` method for pattern matching. For what it is worth. I thought about this design when implementing `ValidOffsetAndEpoch` but found the current design more user friendly in Java. I am all for changing the name of `ValidOffsetAndEpoch`. I was never happy with the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API
jsancio commented on a change in pull request #10085: URL: https://github.com/apache/kafka/pull/10085#discussion_r596996121 ## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java ## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.raft.RecordSerde; + +public final class SnapshotReader implements Closeable, Iterable> { Review comment: Fair enough. Let me revisit this and see what code we can reuse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12497) Source task offset commits continue even after task has failed
Chris Egerton created KAFKA-12497: - Summary: Source task offset commits continue even after task has failed Key: KAFKA-12497 URL: https://issues.apache.org/jira/browse/KAFKA-12497 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.0.0, 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 Reporter: Chris Egerton Assignee: Chris Egerton Source task offset commits take place on a dedicated thread, which periodically triggers offset commits for all of the source tasks on the worker on a user-configurable interval and with a user-configurable timeout for each offset commit. When a task fails, offset commits continue to take place. In the common case where there is no longer any chance for another successful offset commit for the task, this has two negative side-effects: First, confusing log messages are emitted that some users reasonably interpret as a sign that the source task is still alive: {noformat} [2021-03-06 04:30:53,739] INFO WorkerSourceTask{id=Salesforce_PC_Connector_Agency-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) 25[2021-03-06 04:30:53,739] INFO WorkerSourceTask{id=Salesforce_PC_Connector_Agency-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask){noformat} Second, if the task has any source records pending, it will block the shared offset commit thread until the offset commit timeout expires. This will take place repeatedly until the either the task is restarted/deleted, or all of these records are flushed. In some other cases, it's actually somewhat sensible to continue to try to commit offsets. Even if a source task has died, data from it may still be in flight to the broker, and there's no reason not to commit the offsets for that data once it has been ack'd. However, if there is no in-flight data from a source task that is pending an ack from the Kafka cluster, and the task has failed, there is no reason to continue to try to commit offsets. Additionally, if the producer has failed to send a record to Kafka with a non-retriable exception, there is also no reason to continue to try to commit offsets, as the current batch will never complete. We can address one or both of these cases to try to reduce the number of confusing logging messages, and if necessary, alter existing log messages to make it clear to the user that the task may not be alive. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses
tang7526 commented on a change in pull request #10332: URL: https://github.com/apache/kafka/pull/10332#discussion_r597008060 ## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ## @@ -40,42 +42,42 @@ public void produceResponseV5Test() { Map responseData = new HashMap<>(); TopicPartition tp0 = new TopicPartition("test", 0); -responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, -1, RecordBatch.NO_TIMESTAMP, 100)); +responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 1, RecordBatch.NO_TIMESTAMP, 100)); ProduceResponse v5Response = new ProduceResponse(responseData, 10); short version = 5; ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0); ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away. -ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, -buffer, version); - -assertEquals(1, v5FromBytes.responses().size()); -assertTrue(v5FromBytes.responses().containsKey(tp0)); -ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); -assertEquals(100, partitionResponse.logStartOffset); -assertEquals(1, partitionResponse.baseOffset); -assertEquals(10, v5FromBytes.throttleTimeMs()); -assertEquals(responseData, v5Response.responses()); +ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version); + +assertEquals(1, v5FromBytes.data().responses().size()); +ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next(); +assertEquals(1, topicProduceResponse.partitionResponses().size()); +ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next(); +TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index()); +assertEquals(tp0, tp); + +assertEquals(100, partitionProduceResponse.logStartOffset()); +assertEquals(1, partitionProduceResponse.baseOffset()); +assertEquals(RecordBatch.NO_TIMESTAMP, partitionProduceResponse.logAppendTimeMs()); +assertEquals(Errors.NONE, Errors.forCode(partitionProduceResponse.errorCode())); +assertEquals(null, partitionProduceResponse.errorMessage()); Review comment: Done. ## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ## @@ -40,42 +42,42 @@ public void produceResponseV5Test() { Map responseData = new HashMap<>(); TopicPartition tp0 = new TopicPartition("test", 0); -responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, -1, RecordBatch.NO_TIMESTAMP, 100)); +responseData.put(tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 1, RecordBatch.NO_TIMESTAMP, 100)); ProduceResponse v5Response = new ProduceResponse(responseData, 10); short version = 5; ByteBuffer buffer = RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0); ResponseHeader.parse(buffer, ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away. -ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, -buffer, version); - -assertEquals(1, v5FromBytes.responses().size()); -assertTrue(v5FromBytes.responses().containsKey(tp0)); -ProduceResponse.PartitionResponse partitionResponse = v5FromBytes.responses().get(tp0); -assertEquals(100, partitionResponse.logStartOffset); -assertEquals(1, partitionResponse.baseOffset); -assertEquals(10, v5FromBytes.throttleTimeMs()); -assertEquals(responseData, v5Response.responses()); +ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version); + +assertEquals(1, v5FromBytes.data().responses().size()); +ProduceResponseData.TopicProduceResponse topicProduceResponse = v5FromBytes.data().responses().iterator().next(); +assertEquals(1, topicProduceResponse.partitionResponses().size()); +ProduceResponseData.PartitionProduceResponse partitionProduceResponse = topicProduceResponse.partitionResponses().iterator().next(); +TopicPartition tp = new TopicPartition(topicProduceResponse.name(), partitionProduceResponse.index()); +assertEquals(tp0, tp); + +assertEquals(100, partitionProduceResponse.logStartOffset()); +assertEquals(1, partitionProduceResponse.baseOffset()); +
[GitHub] [kafka] rhauch commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
rhauch commented on a change in pull request #10315: URL: https://github.com/apache/kafka/pull/10315#discussion_r597007027 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, +CooperativeStickyAssignor.class.getName() + "," + RangeAssignor.class.getName()); Review comment: As discussed on the [Jira issue](https://issues.apache.org/jira/browse/KAFKA-12463), I think you're suggesting that we use `RoundRobinAssignor` instead of `CooperativeStickyAssignor` due to the issue you found with Connect using the latter ([KAFKA-12487](https://issues.apache.org/jira/browse/KAFKA-12487)). Also, it's probably worthwhile to add a comment here about why we're using _two_ assignors rather than _only_ the cooperative (or round robin) assignor. Maybe something like: ``` // Prefer the cooperative assignor, but allow old range assignor during rolling upgrades // from Connect versions that just used the range assignor (the default) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304248#comment-17304248 ] Randall Hauch commented on KAFKA-12463: --- Thanks for the response and logging KAFKA-12487, [~ChrisEgerton]. {quote} I'd also just like to point out that the goal here is to improve the out-of-the-box behavior of Connect for users; although workarounds are nice to have, the goal here shouldn't be to focus on documenting them but instead, to make them obsolete. If we decide not to improve the default behavior of Connect then we can document this somewhere else that's a little more visible for users as opposed to developers. {quote} I agree with you that we should fix the behavior. But the fix will appear only in certain releases, and not all users will be able to upgrade to those releases to get the fix. So, documenting the simple workaround will help users in those situations. I suggested documenting the workaround here to help any users that do stumble upon this issue when searching for a potential fix, regardless of whether the workaround is also documented elsewhere. IIUC, given KAFKA-12487 is likely more challenging to fix and has not yet been addressed, the shorter term proposal here is to change the worker to set the following on consumer configs used in sink connectors: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignor{code} This works because the round robin will be used only after all workers have been upgraded, and this gives us more balanced consumer assignments. Plus, it is backward compatible since the worker will always override this new value should users have any worker configs that override this property via: {code:java} consumer.partition.assignment.strategy=... {code} or have any connector configs that use client overrides via: {code:java} consumer.overrides.partition.assignment.strategy=...{code} If that is the proposal, WDYT about updating the description to make this more clear? Essentially, I suggest this issue's description would state the problem (that section is good), propose a solution using round robin (mostly using your proposed section to use round robin rather than cooperative), document the workaround, and finally address why {{RoundRobitAssignor}} was used instead of {{CooperativeStickyAssignor}}. And if we're on the same page, then I think it's worth updating the PR to implement the proposed fix. I'll state the same on a review for the PR. > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 s
[GitHub] [kafka] chia7712 commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding
chia7712 commented on a change in pull request #10340: URL: https://github.com/apache/kafka/pull/10340#discussion_r597014831 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -295,6 +295,9 @@ class BrokerToControllerRequestThread( private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]() private val activeController = new AtomicReference[Node](null) + // Used for testing + private[server] var started = false Review comment: Does it need to be thread-safe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304254#comment-17304254 ] Randall Hauch commented on KAFKA-10340: --- {quote} I wonder if this we should actually resolve this JIRA as the fix has been made to trunk, 2.6 and 2.7. If John decides to pull it in 2.8, then we can just add 2.8.0 to Fix Versions. That would enable us to run the 2.6.2 and 2.7.1 releases without having to tweak Fix Versions. {quote} That sounds fine with me. I'll will add the 2.6.2 fix, mark this as resolved. We can then add the 2.8 version (either `2.8.0` or `2.8.1`) if/when [https://github.com/apache/kafka/pull/10238] is merged to 2.8. > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vanhoale commented on pull request #10239: KAFKA-12372: Enhance TimestampCoverter Connect transformation to handle multiple timestamp or date fields
vanhoale commented on pull request #10239: URL: https://github.com/apache/kafka/pull/10239#issuecomment-802059907 hi @kkonstantine, should one of your team members review the PR? we want to use this feature ASAP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses
chia7712 commented on a change in pull request #10332: URL: https://github.com/apache/kafka/pull/10332#discussion_r597031230 ## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ## @@ -78,6 +80,20 @@ public void produceResponseVersionTest() { assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero"); assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10"); assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10"); + +List arrResponse = Arrays.asList(v0Response, v1Response, v2Response); +for(ProduceResponse produceResponse:arrResponse) { Review comment: code style: `produceResponse : arrResponse` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
C0urante commented on a change in pull request #10315: URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, +CooperativeStickyAssignor.class.getName() + "," + RangeAssignor.class.getName()); Review comment: Yeah, a comment is definitely warranted, good call. We might use the `RoundRobinAssignor` but don't think we have to settle for that just yet. There's a couple of bugs/improvements in the way, but at least one of them is being actively worked on (https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the other. I converted this to a draft since it's definitely not ready for merge yet and it's unclear which path forward we might want to take. We might want to reason through this carefully since we only get one shot to do this kind of automated upgrade before the next one gets more complicated (the list of assignors will grow from one to two this time around; it'll either have to grow from two to three the next, or we'll have to risk breaking changes for users who skip an upgrade step). I'll try to do an analysis of the pros/cons of the `CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink connectors on the Jira ticket; I'm actually leaning a tiny bit toward the `RoundRobinAssignor` but I might be suffering from tunnel vision due to recent misadventures with other assignors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
C0urante commented on a change in pull request #10315: URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, +CooperativeStickyAssignor.class.getName() + "," + RangeAssignor.class.getName()); Review comment: Yeah, a comment is definitely warranted, good call. 👌 We might use the `RoundRobinAssignor` but don't think we have to settle for that just yet. There's a couple of bugs/improvements in the way, but at least one of them is being actively worked on (https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the other. I converted this to a draft since it's definitely not ready for merge yet and it's unclear which path forward we might want to take. We might want to reason through this carefully since we only get one shot to do this kind of automated upgrade before the next one gets more complicated (the list of assignors will grow from one to two this time around; it'll either have to grow from two to three the next, or we'll have to risk breaking changes for users who skip an upgrade step). I'll try to do an analysis of the pros/cons of the `CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink connectors on the Jira ticket; I'm actually leaning a tiny bit toward the `RoundRobinAssignor` but I might be suffering from tunnel vision due to recent misadventures with other assignors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
C0urante commented on a change in pull request #10315: URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, +CooperativeStickyAssignor.class.getName() + "," + RangeAssignor.class.getName()); Review comment: Yeah, a comment is definitely warranted, good call. 👌 We might use the `RoundRobinAssignor`, but I don't think we have to settle for that just yet. There's a couple of bugs/improvements in the way, but at least one of them is being actively worked on (https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the other (https://issues.apache.org/jira/browse/KAFKA-12487). I converted this to a draft since it's definitely not ready for merge yet and it's unclear which path forward we might want to take. We might want to reason through this carefully since we only get one shot to do this kind of automated upgrade before the next one gets more complicated (the list of assignors will grow from one to two this time around; it'll either have to grow from two to three the next, or we'll have to risk breaking changes for users who skip an upgrade step). I'll try to do an analysis of the pros/cons of the `CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink connectors on the Jira ticket; I'm actually leaning a tiny bit toward the `RoundRobinAssignor` but I might be suffering from tunnel vision due to recent misadventures with other assignors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor
C0urante commented on a change in pull request #10315: URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ## @@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState configState, consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); +consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, +CooperativeStickyAssignor.class.getName() + "," + RangeAssignor.class.getName()); Review comment: Yeah, a comment is definitely warranted, good call. 👌 We might use the `RoundRobinAssignor`, but I don't think we have to settle for that just yet. There's a couple of bugs/improvements in the way, but at least one of them is being actively worked on (https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the other ([KAFKA-12487](https://issues.apache.org/jira/browse/KAFKA-12487)). I converted this to a draft since it's definitely not ready for merge yet and it's unclear which path forward we might want to take. We might want to reason through this carefully since we only get one shot to do this kind of automated upgrade before the next one gets more complicated (the list of assignors will grow from one to two this time around; it'll either have to grow from two to three the next, or we'll have to risk breaking changes for users who skip an upgrade step). I'll try to do an analysis of the pros/cons of the `CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink connectors on the Jira ticket; I'm actually leaning a tiny bit toward the `RoundRobinAssignor` but I might be suffering from tunnel vision due to recent misadventures with other assignors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10340: -- Fix Version/s: 2.6.2 > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1, 2.6.2 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever
[ https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-10340. --- Reviewer: Randall Hauch Resolution: Fixed > Source connectors should report error when trying to produce records to > non-existent topics instead of hanging forever > -- > > Key: KAFKA-10340 > URL: https://issues.apache.org/jira/browse/KAFKA-10340 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0 >Reporter: Arjun Satish >Assignee: Chris Egerton >Priority: Major > Fix For: 3.0.0, 2.7.1, 2.6.2 > > > Currently, a source connector will blindly attempt to write a record to a > Kafka topic. When the topic does not exist, its creation is controlled by the > {{auto.create.topics.enable}} config on the brokers. When auto.create is > disabled, the producer.send() call on the Connect worker will hang > indefinitely (due to the "infinite retries" configuration for said producer). > In setups where this config is usually disabled, the source connector simply > appears to hang and not produce any output. > It is desirable to either log an info or an error message (or inform the user > somehow) that the connector is simply stuck waiting for the destination topic > to be created. When the worker has permissions to inspect the broker > settings, it can use the {{listTopics}} and {{describeConfigs}} API in > AdminClient to check if the topic exists, the broker can > {{auto.create.topics.enable}} topics, and if these cases do not exist, either > throw an error. > With the recently merged > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics], > this becomes even more specific a corner case: when topic creation settings > are enabled, the worker should handle the corner case where topic creation is > disabled, {{auto.create.topics.enable}} is set to false and topic does not > exist. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #10014: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes
rhauch commented on a change in pull request #10014: URL: https://github.com/apache/kafka/pull/10014#discussion_r597039806 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ## @@ -402,7 +402,7 @@ public void tick() { log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ", scheduledRebalance, now, nextRequestTimeoutMs); } -if (internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { +if (isLeader() && internalRequestValidationEnabled() && keyExpiration < Long.MAX_VALUE) { Review comment: Sounds good. Thanks for logging [KAFKA-12474](https://issues.apache.org/jira/browse/KAFKA-12474) and [KAFKA-12476](https://issues.apache.org/jira/browse/KAFKA-12476). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses
tang7526 commented on a change in pull request #10332: URL: https://github.com/apache/kafka/pull/10332#discussion_r597042561 ## File path: clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java ## @@ -78,6 +80,20 @@ public void produceResponseVersionTest() { assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be zero"); assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 10"); assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 10"); + +List arrResponse = Arrays.asList(v0Response, v1Response, v2Response); +for(ProduceResponse produceResponse:arrResponse) { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12262) New session keys are never distributed when follower with key becomes leader
[ https://issues.apache.org/jira/browse/KAFKA-12262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304256#comment-17304256 ] Randall Hauch commented on KAFKA-12262: --- [https://github.com/apache/kafka/pull/10014] fixes this issue and KAFKA-12252. > New session keys are never distributed when follower with key becomes leader > > > Key: KAFKA-12262 > URL: https://issues.apache.org/jira/browse/KAFKA-12262 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > The expiration time for session keys [starts at > {{Long.MAX_VALUE}}|https://github.com/apache/kafka/blob/1c00d9dfa50570b4554a123bfe91ccc5e9ad1ca9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L258] > and [is updated when a new session key is read if and only if the worker is > the > leader|https://github.com/apache/kafka/blob/1c00d9dfa50570b4554a123bfe91ccc5e9ad1ca9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1579-L1581]. > If a follower reads a session key from the config topic, the expiration time > will remain {{Long.MAX_VALUE}} and, even if the worker becomes the leader, > will not be updated. > Once this happens, all key rotation will cease until and unless the former > leader of the cluster becomes the leader again without being restarted in the > meantime, or all workers in the cluster are shut down at the same time and > then the cluster is brought back up again. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions
chia7712 commented on pull request #10077: URL: https://github.com/apache/kafka/pull/10077#issuecomment-802076787 @rhauch Could you take a look? this uneven distributions can be reproduced easily so it would be nice to fix it asap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10335: KAFKA-12484: Enable Connect's connector log contexts by default (KIP-721)
rhauch commented on pull request #10335: URL: https://github.com/apache/kafka/pull/10335#issuecomment-802077441 Note, if the upgrade of Log4J to Log4J2 (#7898) is merged prior to this, we should update this PR to also change to the new Log4J2 properties file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bsolomon1124 opened a new pull request #10350: [docs] Demo use of *-server-stop commands to stop
bsolomon1124 opened a new pull request #10350: URL: https://github.com/apache/kafka/pull/10350 ...rather than CTRL+C. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12498) Consumer Group is zero when MirrorMaker 2.0 is running
Olumide Ajiboye created KAFKA-12498: --- Summary: Consumer Group is zero when MirrorMaker 2.0 is running Key: KAFKA-12498 URL: https://issues.apache.org/jira/browse/KAFKA-12498 Project: Kafka Issue Type: Bug Components: consumer, replication Affects Versions: 2.7.0 Environment: Kubernetes 1.19.7 Strimzi 0.21.1 Reporter: Olumide Ajiboye I have two Kafka clusters in Active\Passive replication using MM 2.0. When I produce messages to a topic and try to read from it in the same cluster, the consumer group lag is already set to zero. The replica topic also has the same lag and log-end-offset. My MM2 is using a superuser account and I am using a separate consumer-group with permission just to read and write to this topic. Just to elaborate further, * Write 10 messages to a topic without any active consumers but with MM2 replicating the topic to a passive Cluster. * Attempt to read 3 messages from the topic, this creates my consumer group and adds an active consumer. The result is no messages are read * Describe the consumer group, the result shows Log-End-Offset with correct number of messages, but Lag shows 0 * Attempt to read 3 messages from passive cluster using same consumer-group. Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct number of messages (i.e. same as active cluster) * Keep consumer running * Write a few more messages. * Consumer is now reading latest messages * Stop consumer * Keep writing new messages * Lag shows correct value. In the absence of MM2, Kafka operation is as expected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan opened a new pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest
jolshan opened a new pull request #10351: URL: https://github.com/apache/kafka/pull/10351 As a result of https://github.com/apache/kafka/pull/9684, a new field for topic names was created. For versions 6+ `DeleteTopicsRequestData.topicNames` will return an empty list in KafkaApis. This PR uses a new method to efficiently initialize the size of the collection. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bsolomon1124 opened a new pull request #10352: [doc] Properly use language-java to highlight Java code
bsolomon1124 opened a new pull request #10352: URL: https://github.com/apache/kafka/pull/10352 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write
vamossagar12 commented on a change in pull request #10278: URL: https://github.com/apache/kafka/pull/10278#discussion_r597054403 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1876,7 +1876,7 @@ private long maybeAppendBatches( BatchAccumulator.CompletedBatch batch = iterator.next(); appendBatch(state, batch, currentTimeMs); } -flushLeaderLog(state, currentTimeMs); +//flushLeaderLog(state, currentTimeMs); Review comment: Got it. So, essentially what you are saying is within onUpdateLeaderHighWatermark I can check if the HWM moved due to an increase in the LEO(which can be stored to keep track of the last greatest value flushed) or due to other cases. So, we don't need to flush only when HWM was crossed but instead we are adding another invariant that the LEO should also have moved. I think it makes sense to me now. I will start making the changes.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API
hachikuji commented on a change in pull request #10275: URL: https://github.com/apache/kafka/pull/10275#discussion_r597061628 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java ## @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.internals.AdminApiHandler.DynamicKeyMapping; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.KeyMappings; +import org.apache.kafka.clients.admin.internals.AdminApiHandler.StaticKeyMapping; +import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.RequestScope; +import org.apache.kafka.common.errors.DisconnectException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The `KafkaAdminClient`'s internal `Call` primitive is not a good fit for multi-stage + * request workflows such as we see with the group coordinator APIs or any request which + * needs to be sent to a partition leader. Typically these APIs have two concrete stages: + * + * 1. Lookup: Find the broker that can fulfill the request (e.g. partition leader or group + *coordinator) + * 2. Fulfillment: Send the request to the broker found in the first step + * + * This is complicated by the fact that `Admin` APIs are typically batched, which + * means the Lookup stage may result in a set of brokers. For example, take a `ListOffsets` + * request for a set of topic partitions. In the Lookup stage, we will find the partition + * leaders for this set of partitions; in the Fulfillment stage, we will group together + * partition according to the IDs of the discovered leaders. + * + * Additionally, the flow between these two stages is bi-directional. We may find after + * sending a `ListOffsets` request to an expected leader that there was a leader change. + * This would result in a topic partition being sent back to the Lookup stage. + * + * Managing this complexity by chaining together `Call` implementations is challenging + * and messy, so instead we use this class to do the bookkeeping. It handles both the + * batching aspect as well as the transitions between the Lookup and Fulfillment stages. + * + * Note that the interpretation of the `retries` configuration becomes ambiguous + * for this kind of pipeline. We could treat it as an overall limit on the number + * of requests that can be sent, but that is not very useful because each pipeline + * has a minimum number of requests that need to be sent in order to satisfy the request. + * Instead, we treat this number of retries independently at each stage so that each + * stage has at least one opportunity to complete. So if a user sets `retries=1`, then + * the full pipeline can still complete as long as there are no request failures. + * + * @param The key type, which is also the granularity of the request routing (e.g. + *this could be `TopicPartition` in the case of requests intended for a partition + *leader or the `GroupId` in the case of consumer group requests intended for + *the group coordinator) + * @param The fulfillment type for each key (e.g. this could be consumer group state + *when the key type is a consumer `GroupId`) + */ +public class AdminApiDriver { +private final Logger log; +private final long retryBackoffMs; +private final long deadlineMs; +private final AdminApiHandler handler; +private final Optional> staticMapping; +private final Optional> dynamicMapping; +private final Map> futures; + +private final BiMultimap lookupMap = new BiMultimap<>(); +private final BiMultimap fulfillmentMap = new B
[GitHub] [kafka] cmccabe commented on a change in pull request #10334: MINOR: Fix BaseHashTable sizing
cmccabe commented on a change in pull request #10334: URL: https://github.com/apache/kafka/pull/10334#discussion_r597064603 ## File path: metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java ## @@ -56,12 +61,27 @@ this.elements = new Object[expectedSizeToCapacity(expectedSize)]; } +/** + * Calculate the capacity we should provision, given the expected size. + * + * Our capacity must always be a power of 2, and never less than 2. + */ static int expectedSizeToCapacity(int expectedSize) { -if (expectedSize <= 1) { -return 2; +if (expectedSize >= MAX_CAPACITY / 2) { +return MAX_CAPACITY; +} +return Math.max(MIN_CAPACITY, roundUpToPowerOfTwo(expectedSize * 2)); +} + +private static int roundUpToPowerOfTwo(int i) { +if (i <= 0) { +return 0; +} else if (i > MAX_SIGNED_POWER_OF_TWO) { Review comment: Good catch. Yes, this should be fixed now in the latest logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API
hachikuji commented on a change in pull request #10275: URL: https://github.com/apache/kafka/pull/10275#discussion_r597064897 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; + +import java.util.Collections; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +public interface AdminApiLookupStrategy { + +/** + * Define the scope of a given key for lookup. Key lookups are complicated + * by the need to accommodate different batching mechanics. For example, + * a `Metadata` request supports arbitrary batching of topic partitions in + * order to discover partitions leaders. This can be supported by returning + * a single scope object for all keys. + * + * On the other hand, `FindCoordinator` request only supports lookup of a + * single key. This can be supported by returning a different scope object + * for each lookup key. + * + * @param key the lookup key + * + * @return request scope indicating how lookup requests can be batched together + */ +RequestScope lookupScope(T key); + +/** + * Build the lookup request for a set of keys. The grouping of the keys is controlled + * through {@link #lookupScope(Object)}. In other words, each set of keys that map + * to the same request scope object will be sent to this method. + * + * @param keys the set of keys that require lookup + * + * @return a builder for the lookup request + */ +AbstractRequest.Builder buildRequest(Set keys); + +/** + * Callback that is invoked when a lookup request returns successfully. The handler + * should parse the response, check for errors, and return a result indicating + * which keys were mapped to a brokerId successfully and which keys received + * a fatal error (e.g. a topic authorization failure). + * + * Note that keys which receive a retriable error should be left out of the + * result. They will be retried automatically. For example, if the response of + * `FindCoordinator` request indicates an unavailable coordinator, then the key + * should be left out of the result so that the request will be retried. + * + * @param keys the set of keys from the associated request + * @param response the response received from the broker + * + * @return a result indicating which keys mapped successfully to a brokerId and + * which encountered a fatal error + */ +LookupResult handleResponse(Set keys, AbstractResponse response); + +class LookupResult { +public final Map mappedKeys; +public final Map failedKeys; + +public LookupResult( +Map failedKeys, +Map mappedKeys +) { +this.failedKeys = Collections.unmodifiableMap(failedKeys); +this.mappedKeys = Collections.unmodifiableMap(mappedKeys); +} +} + +interface RequestScope { +default OptionalInt destinationBrokerId() { +return OptionalInt.empty(); +} +} Review comment: Let me think about it. I wasn't entirely happy with this location either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10317: KAFKA-10357: Add setup method to internal topics
guozhangwang merged pull request #10317: URL: https://github.com/apache/kafka/pull/10317 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304285#comment-17304285 ] hudeqi commented on KAFKA-12478: Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case. Looking forward to your reply! > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #10334: MINOR: Fix BaseHashTable sizing
cmccabe merged pull request #10334: URL: https://github.com/apache/kafka/pull/10334 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Issue Comment Deleted] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-12478: --- Comment: was deleted (was: z) > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304286#comment-17304286 ] hudeqi commented on KAFKA-12478: z > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest
dajac commented on a change in pull request #10351: URL: https://github.com/apache/kafka/pull/10351#discussion_r597073992 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java ## @@ -101,6 +101,12 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { return data.topics().stream().map(topic -> topic.name()).collect(Collectors.toList()); return data.topicNames(); } + +public int numberOfTopics() { Review comment: Should we add a small unit test or extend an existing unit test to verify this across all versions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304285#comment-17304285 ] hudeqi edited comment on KAFKA-12478 at 3/18/21, 5:04 PM: -- Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. *Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case.* Looking forward to your reply! was (Author: hudeqi): Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case. Looking forward to your reply! > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12499) Adjust transaction timeout according to commit interval on Streams EOS
Boyang Chen created KAFKA-12499: --- Summary: Adjust transaction timeout according to commit interval on Streams EOS Key: KAFKA-12499 URL: https://issues.apache.org/jira/browse/KAFKA-12499 Project: Kafka Issue Type: Improvement Components: streams Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 3.0.0 The transaction timeout is set to 1 minute by default on Producer today, while the commit interval on the other hand could be set to a very large value, which makes the stream always hit transaction timeout and drop into rebalance. We should increase the transaction timeout correspondingly when commit interval is large. On the other hand, broker could have a limit on the max transaction timeout to be set. If we scale up client transaction timeout over the limit, stream will fail due to INVALID_TRANSACTION_TIMEOUT. To alleviate this problem, user could define their own customized transaction timeout to avoid hitting the limit, so we should still respect what user configures in the override. The new rule for configuring transaction timeout should look like: 1. If transaction timeout is set in streams config, use it 2. if not, transaction_timeout = max(default_transaction_timeout, 10 * commit_interval) Additionally if INVALID_TRANSACTION_TIMEOUT was thrown on Streams when calling initTransaction(), we should wrap the exception to inform user that their setting for commit interval could potentially be too high and should adjust. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500
hachikuji commented on a change in pull request #10227: URL: https://github.com/apache/kafka/pull/10227#discussion_r597080557 ## File path: KIP-500.md ## @@ -0,0 +1,157 @@ +KIP-500 Early Access Release + + +# Introduction +It is now possible to run Apache Kafka without Apache ZooKeeper! We call this mode [self-managed mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum). It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it is available for testing in the Kafka 2.8 release. + +When the Kafka cluster is in self-managed mode, it does not store its metadata in ZooKeeper. In fact, you do not have to run ZooKeeper at all, because it stores its metadata in a Raft quorum of controller nodes. + +Self-managed mode has many benefits -- some obvious, and some not so obvious. Clearly, it is nice to manage and configure one service rather than two services. In addition, you can now run a single process Kafka cluster. Most important of all, self-managed mode is more scalable. We expect to be able to [support many more topics and partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/) in this mode. + +# Quickstart + +## Warning +Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.0 is released, it may not even be possible to upgrade your self-managed clusters from 2.8 to 3.0. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the early access release of KIP-500. + +## Generate a cluster ID +The first step is to generate an ID for your new cluster, using the kafka-storage tool: + + +$ ./bin/kafka-storage.sh random-uuid +xtzWWN4bTjitpL3kfd9s5g + + +## Format Storage Directories +The next step is to format your storage directories. If you are running in single-node mode, you can do this with one command: + + +$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c ./config/nozk-combined.properties +Formatting /tmp/nozk-combined-logs + + +If you are using multiple nodes, then you should run the format command on each node. Be sure to use the same cluster ID for each one. + +## Start the Kafka Server +Finally, you are ready to start the Kafka server on each node. + + +$ ./bin/kafka-server-start.sh ./config/nozk-combined.properties +[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) +[2021-02-26 15:37:11,294] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) +[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, dir=/tmp/nozk-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) +[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) +[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) (org.apache.kafka.raft.QuorumState) +... + + +Just like with a ZooKeeper based broker, you can connect to port 9092 (or whatever port you configured) to perform administrative operations or produce or consume data. + + +$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 +Created topic foo. + + +# Deployment + +## Controller Servers +Unlike in ZooKeeper-based mode, where any server can become the controller, in self-managed mode, only a small group of specially selected servers can act as controllers. The specially selected controller servers will participate in the metadata quorum. Each KIP-500 controller server is either active, or a hot standby for the current active controller server. + +Typically you will select either 3 or 5 servers for this role, depending on the size of your cluster. Just like with ZooKeeper, you must keep a majority of the controllers alive in order to maintain availability. So if you have 3 controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 failures. + +## Process Roles +Each Kafka server now has a new configuration key called `process.roles` which can have the following values: + +* If `process.roles` is set to `broker`, the server acts as a self-managed broker. +* If `process.roles` is set to `controller`, the server acts as a self-managed controller. +* If `process.roles` is set to `broker,controller`, the server acts as both a self-managed broker and a self-managd controller. +* If `process.roles` is not set at all then we are assumed to be in ZooKeeper mode. As mentioned earlie
[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500
cmccabe commented on a change in pull request #10227: URL: https://github.com/apache/kafka/pull/10227#discussion_r597083837 ## File path: KIP-500.md ## @@ -0,0 +1,157 @@ +KIP-500 Early Access Release + + +# Introduction +It is now possible to run Apache Kafka without Apache ZooKeeper! We call this mode [self-managed mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum). It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it is available for testing in the Kafka 2.8 release. + +When the Kafka cluster is in self-managed mode, it does not store its metadata in ZooKeeper. In fact, you do not have to run ZooKeeper at all, because it stores its metadata in a Raft quorum of controller nodes. + +Self-managed mode has many benefits -- some obvious, and some not so obvious. Clearly, it is nice to manage and configure one service rather than two services. In addition, you can now run a single process Kafka cluster. Most important of all, self-managed mode is more scalable. We expect to be able to [support many more topics and partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/) in this mode. + +# Quickstart + +## Warning +Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.0 is released, it may not even be possible to upgrade your self-managed clusters from 2.8 to 3.0. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the early access release of KIP-500. + +## Generate a cluster ID +The first step is to generate an ID for your new cluster, using the kafka-storage tool: + + +$ ./bin/kafka-storage.sh random-uuid +xtzWWN4bTjitpL3kfd9s5g + + +## Format Storage Directories +The next step is to format your storage directories. If you are running in single-node mode, you can do this with one command: + + +$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c ./config/nozk-combined.properties +Formatting /tmp/nozk-combined-logs + + +If you are using multiple nodes, then you should run the format command on each node. Be sure to use the same cluster ID for each one. + +## Start the Kafka Server +Finally, you are ready to start the Kafka server on each node. + + +$ ./bin/kafka-server-start.sh ./config/nozk-combined.properties +[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) +[2021-02-26 15:37:11,294] INFO Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation (org.apache.zookeeper.common.X509Util) +[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, dir=/tmp/nozk-combined-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log) +[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) +[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) (org.apache.kafka.raft.QuorumState) +... + + +Just like with a ZooKeeper based broker, you can connect to port 9092 (or whatever port you configured) to perform administrative operations or produce or consume data. + + +$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092 +Created topic foo. + + +# Deployment + +## Controller Servers +Unlike in ZooKeeper-based mode, where any server can become the controller, in self-managed mode, only a small group of specially selected servers can act as controllers. The specially selected controller servers will participate in the metadata quorum. Each KIP-500 controller server is either active, or a hot standby for the current active controller server. + +Typically you will select either 3 or 5 servers for this role, depending on the size of your cluster. Just like with ZooKeeper, you must keep a majority of the controllers alive in order to maintain availability. So if you have 3 controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 failures. + +## Process Roles +Each Kafka server now has a new configuration key called `process.roles` which can have the following values: + +* If `process.roles` is set to `broker`, the server acts as a self-managed broker. +* If `process.roles` is set to `controller`, the server acts as a self-managed controller. +* If `process.roles` is set to `broker,controller`, the server acts as both a self-managed broker and a self-managd controller. +* If `process.roles` is not set at all then we are assumed to be in ZooKeeper mode. As mentioned earlier,
[jira] [Commented] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304295#comment-17304295 ] Guozhang Wang commented on KAFKA-12472: --- I just added a few more states for Streams, regarding various TaskMigrated error cases. > Add a Consumer / Streams metric to indicate the current rebalance status > > > Key: KAFKA-12472 > URL: https://issues.apache.org/jira/browse/KAFKA-12472 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > Today to trouble shoot a rebalance issue operators need to do a lot of manual > steps: locating the problematic members, search in the log entries, and look > for related metrics. It would be great to add a single metric that covers all > these manual steps and operators would only need to check this single signal > to check what is the root cause. A concrete idea is to expose two enum gauge > metrics on consumer and streams, respectively: > * Consumer level (the order below is by-design, see Streams level for > details): > 0. *None* => there is no rebalance on going. > 1. *CoordinatorRequested* => any of the coordinator response contains a > RebalanceInProgress error code. > 2. *NewMember* => when the join group response has a MemberIdRequired error > code. > 3. *UnknownMember* => when any of the coordinator response contains an > UnknownMember error code, indicating this member is already kicked out of the > group. > 4. *StaleMember* => when any of the coordinator response contains an > IllegalGeneration error code. > 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb > expired. > 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll > API, as well as upon calling the enforceRebalance API. > 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. > 8. *SubscriptionChanged* => requestRejoin triggered since subscription has > changed. > 9. *RetryOnError* => when join/syncGroup response contains a retriable > error which would cause the consumer to backoff and retry. > 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions > is not empty. > The transition rule is that a non-zero status code can only transit to zero > or to a higher code, but not to a lower code (same for streams, see > rationales below). > * Streams level: today a streams client can have multiple consumers. We > introduced some new enum states as well as aggregation rules across > consumers: if there's no streams-layer events as below that transits its > status (i.e. streams layer think it is 0), then we aggregate across all the > embedded consumers and take the largest status code value as the streams > metric; if there are streams-layer events that determines its status should > be in 10+, then it ignores all embedded consumer layer status code since it > should has a higher precedence. In addition, when create aggregated metric > across streams instance (a.k.a at the app-level, which is usually what we > would care and alert on), we also follow the same aggregation rule, e.g. if > there are two streams instance where one instance's status code is 1), and > the other is 10), then the app's status is 10). > 10. *RevocationNeeded* => the definition of this is changed to the original > 10) defined in consumer above, OR leader decides to revoke either > active/standby tasks and hence schedule follow-ups. > 11. *AssignmentProbing* => leader decides to schedule follow-ups since the > current assignment is unstable. > 12. *VersionProbing* => leader decides to schedule follow-ups due to version > probing. > 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to > endpoint updates. > The main motivations of the above proposed precedence order are the following: > 1. When a rebalance is triggered by one member, all other members would only > know it is due to CoordinatorRequested from coordinator error codes, and > hence CoordinatorRequested should be overridden by any other status when > aggregating across clients. > 2. DroppedGroup could cause unknown/stale members that would fail and retry > immediately, and hence should take higher precedence. > 3. Revocation definition is extended in Streams, and hence it needs to take > the highest precedence among all consumer-only status so that it would not be > overridden by any of the consumer-only status. > 4. In general, more rare events get higher precedence. > This is proposed on top of KAFKA-12352. Any comments on the precedence rules > / categorization are more than welcomed! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest
jolshan commented on a change in pull request #10351: URL: https://github.com/apache/kafka/pull/10351#discussion_r597088531 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java ## @@ -101,6 +101,12 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { return data.topics().stream().map(topic -> topic.name()).collect(Collectors.toList()); return data.topicNames(); } + +public int numberOfTopics() { Review comment: Can do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #10227: KAFKA-12382: add a README for KIP-500
cmccabe merged pull request #10227: URL: https://github.com/apache/kafka/pull/10227 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then it ignores all embedded consumer layer status code since it should has a higher precedence. In addition, when create aggregated metric across streams instance (a.k.a at the app-level, which is usually what we would care and alert on), we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. 14. *EOSViolated* => when OutOfOrderSequenceException is thrown, causing TaskMigratedException 15. *EOSProducerFenced* => when ProducerFencedException / InvalidProducerEpochException / UnknownProducerIdException are thrown, causing TaskMigratedException 15. *ConsumerDropped* => when CommitFailedException are thrown, causing TaskMigratedException The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. This is proposed on top of KAFKA-12352. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, r
[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status
[ https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-12472: -- Description: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, respectively: * Consumer level (the order below is by-design, see Streams level for details): 0. *None* => there is no rebalance on going. 1. *CoordinatorRequested* => any of the coordinator response contains a RebalanceInProgress error code. 2. *NewMember* => when the join group response has a MemberIdRequired error code. 3. *UnknownMember* => when any of the coordinator response contains an UnknownMember error code, indicating this member is already kicked out of the group. 4. *StaleMember* => when any of the coordinator response contains an IllegalGeneration error code. 5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb expired. 6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, as well as upon calling the enforceRebalance API. 7. *MetadataChanged* => requestRejoin triggered since metadata has changed. 8. *SubscriptionChanged* => requestRejoin triggered since subscription has changed. 9. *RetryOnError* => when join/syncGroup response contains a retriable error which would cause the consumer to backoff and retry. 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is not empty. The transition rule is that a non-zero status code can only transit to zero or to a higher code, but not to a lower code (same for streams, see rationales below). * Streams level: today a streams client can have multiple consumers. We introduced some new enum states as well as aggregation rules across consumers: if there's no streams-layer events as below that transits its status (i.e. streams layer think it is 0), then we aggregate across all the embedded consumers and take the largest status code value as the streams metric; if there are streams-layer events that determines its status should be in 10+, then it ignores all embedded consumer layer status code since it should has a higher precedence. In addition, when create aggregated metric across streams instance (a.k.a at the app-level, which is usually what we would care and alert on), we also follow the same aggregation rule, e.g. if there are two streams instance where one instance's status code is 1), and the other is 10), then the app's status is 10). 10. *RevocationNeeded* => the definition of this is changed to the original 10) defined in consumer above, OR leader decides to revoke either active/standby tasks and hence schedule follow-ups. 11. *AssignmentProbing* => leader decides to schedule follow-ups since the current assignment is unstable. 12. *VersionProbing* => leader decides to schedule follow-ups due to version probing. 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint updates. 14. *EOSViolated* => when OutOfOrderSequenceException is thrown, causing TaskMigratedException 15. *EOSProducerFenced* => when ProducerFencedException / InvalidProducerEpochException / UnknownProducerIdException are thrown, causing TaskMigratedException 16. *ConsumerDropped* => when CommitFailedException are thrown, causing TaskMigratedException The main motivations of the above proposed precedence order are the following: 1. When a rebalance is triggered by one member, all other members would only know it is due to CoordinatorRequested from coordinator error codes, and hence CoordinatorRequested should be overridden by any other status when aggregating across clients. 2. DroppedGroup could cause unknown/stale members that would fail and retry immediately, and hence should take higher precedence. 3. Revocation definition is extended in Streams, and hence it needs to take the highest precedence among all consumer-only status so that it would not be overridden by any of the consumer-only status. 4. In general, more rare events get higher precedence. This is proposed on top of KAFKA-12352. Any comments on the precedence rules / categorization are more than welcomed! was: Today to trouble shoot a rebalance issue operators need to do a lot of manual steps: locating the problematic members, search in the log entries, and look for related metrics. It would be great to add a single metric that covers all these manual steps and operators would only need to check this single signal to check what is the root cause. A concrete idea is to expose two enum gauge metrics on consumer and streams, r
[jira] [Resolved] (KAFKA-12376) Use scheduleAtomicAppend for records that need to be atomic
[ https://issues.apache.org/jira/browse/KAFKA-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12376. -- Fix Version/s: 2.8.0 Resolution: Fixed > Use scheduleAtomicAppend for records that need to be atomic > --- > > Key: KAFKA-12376 > URL: https://issues.apache.org/jira/browse/KAFKA-12376 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12383) Get RaftClusterTest.java and other KIP-500 junit tests working
[ https://issues.apache.org/jira/browse/KAFKA-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12383: - Affects Version/s: (was: 2.8.0) 3.0.0 2.9 > Get RaftClusterTest.java and other KIP-500 junit tests working > -- > > Key: KAFKA-12383 > URL: https://issues.apache.org/jira/browse/KAFKA-12383 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0, 2.9 >Reporter: Colin McCabe >Assignee: David Arthur >Priority: Blocker > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12382) Create KIP-500 README for the 2.8 release
[ https://issues.apache.org/jira/browse/KAFKA-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12382. -- Fix Version/s: 2.8.0 Resolution: Fixed > Create KIP-500 README for the 2.8 release > - > > Key: KAFKA-12382 > URL: https://issues.apache.org/jira/browse/KAFKA-12382 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding
hachikuji commented on a change in pull request #10340: URL: https://github.com/apache/kafka/pull/10340#discussion_r597106362 ## File path: core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala ## @@ -319,12 +325,40 @@ class BrokerToControllerRequestThreadTest { val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) +testRequestThread.started = true testRequestThread.enqueue(queueItem) pollUntil(testRequestThread, () => callbackResponse.get != null) assertNotNull(callbackResponse.get.authenticationException) } + @Test + def testThreadNotStarted(): Unit = { +// Make sure we throw if we enqueue anything while the thread is not running +val time = new MockTime() +val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181")) + +val metadata = mock(classOf[Metadata]) +val mockClient = new MockClient(time, metadata) + +val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) + +val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2)) Review comment: I don't think we need this either since the request never gets sent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #10202: KAFKA-12392: Deprecate the batch-size option in console producer
kamalcph commented on pull request #10202: URL: https://github.com/apache/kafka/pull/10202#issuecomment-802157373 Can we deprecate this config in 2.8 if allowed and remove it in the 3.0 release? WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12498) Consumer Group Lag is zero when MirrorMaker 2.0 is running
[ https://issues.apache.org/jira/browse/KAFKA-12498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Olumide Ajiboye updated KAFKA-12498: Summary: Consumer Group Lag is zero when MirrorMaker 2.0 is running (was: Consumer Group is zero when MirrorMaker 2.0 is running) > Consumer Group Lag is zero when MirrorMaker 2.0 is running > -- > > Key: KAFKA-12498 > URL: https://issues.apache.org/jira/browse/KAFKA-12498 > Project: Kafka > Issue Type: Bug > Components: consumer, replication >Affects Versions: 2.7.0 > Environment: Kubernetes 1.19.7 > Strimzi 0.21.1 >Reporter: Olumide Ajiboye >Priority: Major > > I have two Kafka clusters in Active\Passive replication using MM 2.0. > When I produce messages to a topic and try to read from it in the same > cluster, the consumer group lag is already set to zero. The replica topic > also has the same lag and log-end-offset. > My MM2 is using a superuser account and I am using a separate consumer-group > with permission just to read and write to this topic. > Just to elaborate further, * Write 10 messages to a topic without any active > consumers but with MM2 replicating the topic to a passive Cluster. > * Attempt to read 3 messages from the topic, this creates my consumer group > and adds an active consumer. The result is no messages are read > * Describe the consumer group, the result shows Log-End-Offset with correct > number of messages, but Lag shows 0 > * Attempt to read 3 messages from passive cluster using same consumer-group. > Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct > number of messages (i.e. same as active cluster) > * Keep consumer running > * Write a few more messages. > * Consumer is now reading latest messages > * Stop consumer > * Keep writing new messages > * Lag shows correct value. > In the absence of MM2, Kafka operation is as expected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12498) Consumer Group Lag is zero when MirrorMaker 2.0 is running
[ https://issues.apache.org/jira/browse/KAFKA-12498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Olumide Ajiboye updated KAFKA-12498: Description: I have two Kafka clusters in Active\Passive replication using MM 2.0. When I produce messages to a topic and try to read from it in the same cluster, the consumer group lag is already set to zero. The replica topic also has the same lag and log-end-offset. MM2 is using a superuser client and I am using a separate consumer-group with separate credentials and with permission just to read and write to this topic. Steps to reproduce: * Setup cluster and DR cluster with MM2 in uni-directional replication. * Create a topic. * Write 10 messages to the topic without any active consumers but with MM2 replicating the topic to a passive Cluster. * Attempt to read 3 messages from the topic, this creates my consumer group and adds an active consumer. The result is no messages are read * Describe the consumer group, the result shows Log-End-Offset with correct number of messages, but Lag shows 0 * Attempt to read 3 messages from passive cluster using same consumer-group. Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct number of messages (i.e. same as active cluster) * Keep consumer running * Write a few more messages. * Consumer is now reading latest messages * Stop consumer * Keep writing new messages * Lag shows correct value. In the absence of MM2, Kafka operation is as expected. was: I have two Kafka clusters in Active\Passive replication using MM 2.0. When I produce messages to a topic and try to read from it in the same cluster, the consumer group lag is already set to zero. The replica topic also has the same lag and log-end-offset. My MM2 is using a superuser account and I am using a separate consumer-group with permission just to read and write to this topic. Just to elaborate further, * Write 10 messages to a topic without any active consumers but with MM2 replicating the topic to a passive Cluster. * Attempt to read 3 messages from the topic, this creates my consumer group and adds an active consumer. The result is no messages are read * Describe the consumer group, the result shows Log-End-Offset with correct number of messages, but Lag shows 0 * Attempt to read 3 messages from passive cluster using same consumer-group. Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct number of messages (i.e. same as active cluster) * Keep consumer running * Write a few more messages. * Consumer is now reading latest messages * Stop consumer * Keep writing new messages * Lag shows correct value. In the absence of MM2, Kafka operation is as expected. > Consumer Group Lag is zero when MirrorMaker 2.0 is running > -- > > Key: KAFKA-12498 > URL: https://issues.apache.org/jira/browse/KAFKA-12498 > Project: Kafka > Issue Type: Bug > Components: consumer, replication >Affects Versions: 2.7.0 > Environment: Kubernetes 1.19.7 > Strimzi 0.21.1 >Reporter: Olumide Ajiboye >Priority: Major > > I have two Kafka clusters in Active\Passive replication using MM 2.0. > When I produce messages to a topic and try to read from it in the same > cluster, the consumer group lag is already set to zero. The replica topic > also has the same lag and log-end-offset. > MM2 is using a superuser client and I am using a separate consumer-group with > separate credentials and with permission just to read and write to this > topic. > Steps to reproduce: > * Setup cluster and DR cluster with MM2 in uni-directional replication. > * Create a topic. > * Write 10 messages to the topic without any active consumers but with MM2 > replicating the topic to a passive Cluster. > * Attempt to read 3 messages from the topic, this creates my consumer group > and adds an active consumer. The result is no messages are read > * Describe the consumer group, the result shows Log-End-Offset with correct > number of messages, but Lag shows 0 > * Attempt to read 3 messages from passive cluster using same consumer-group. > Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct > number of messages (i.e. same as active cluster) > * Keep consumer running > * Write a few more messages. > * Consumer is now reading latest messages > * Stop consumer > * Keep writing new messages > * Lag shows correct value. > In the absence of MM2, Kafka operation is as expected. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah merged pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding
mumrah merged pull request #10340: URL: https://github.com/apache/kafka/pull/10340 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #10344: MINOR: Remove use of NoSuchElementException
hachikuji merged pull request #10344: URL: https://github.com/apache/kafka/pull/10344 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304330#comment-17304330 ] Randall Hauch commented on KAFKA-3813: -- [KIP-131|https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector] was implemented with KAFKA-4794. > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reopened KAFKA-3813: -- > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-3813. -- Resolution: Duplicate > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah opened a new pull request #10353: DO NOT MERGE: Add a failing unit test to check Gradle and Github behavior
mumrah opened a new pull request #10353: URL: https://github.com/apache/kafka/pull/10353 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
rohitrmd commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r597144327 ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -485,6 +485,170 @@ final class KafkaMetadataLogTest { batchBuilder.build() } + @Test + def testValidateEpochGreaterThanLastKnownEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 1 +val epoch = 1 + +append(log, numberOfRecords, epoch) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1) +assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) + } + + @Test + def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 10 +val epoch = 1 + +append(log, numberOfRecords, epoch) +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) + +val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() +} +assertTrue(log.deleteBeforeSnapshot(snapshotId)) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) +assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch()) Review comment: @hachikuji can you explain what is meant by using snapshotId? You mean changing `assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())` to`assertEquals(new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch), resultOffsetAndEpoch.offsetAndEpoch()) ` and not use variables? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #10354: MINOR: Exclude KIP-500.md from rat check
jolshan opened a new pull request #10354: URL: https://github.com/apache/kafka/pull/10354 Builds are failing since this file does not have a license. Similar .md files do not seem to have licenses, so I've added this file to the exclude list. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10349: MINOR: Move `configurations.all` to be a child of `allprojects`
ijuma merged pull request #10349: URL: https://github.com/apache/kafka/pull/10349 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
hachikuji commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802206799 Thanks for fixing this so quickly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org