[jira] [Created] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)
Luke Chen created KAFKA-12495:
-

 Summary: Unbalanced connectors/tasks distribution will happen in 
Connect's incremental cooperative assignor
 Key: KAFKA-12495
 URL: https://issues.apache.org/jira/browse/KAFKA-12495
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen
Assignee: Luke Chen
 Attachments: image-2021-03-18-15-04-57-854.png, 
image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png

In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on 
[KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].]
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened (we'll use 10 
tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2021-03-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303902#comment-17303902
 ] 

Luke Chen commented on KAFKA-12283:
---

The root cause for this flaky test is described in this ticket: KAFKA-12495. 
I'll apply workaround in this flaky test first to make the tests reliable. And 
then, refactor and add more tests in KAFKA-12495. Thanks.

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Chia-Ping Tsai
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12495:
--
Description: 
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on 
[KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened (we'll use 10 
tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.

  was:
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on 
[KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].]
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened (we'll use 10 
tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.


> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on 
> [KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of rebla

[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12495:
--
Description: 
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened? Let's see this 
example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.

  was:
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened (we'll use 10 
tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.


> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15

[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12495:
--
Description: 
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened (we'll use 10 
tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.

  was:
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on 
[KIP-415|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened (we'll use 10 
tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.


> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incrementa

[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread rameshkrishnan muthusamy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303903#comment-17303903
 ] 

rameshkrishnan muthusamy commented on KAFKA-12495:
--

[~showuon] this is being addressed in 
https://issues.apache.org/jira/browse/KAFKA-10413 , let me know if you still 
see any issue with this patch as well

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W3 left after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> // W3 is down
> W3 doesn't join
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> which cause unbalanced distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
> {code}
> We cannot assume the member count after keeps the same right after revocation.
>  
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303906#comment-17303906
 ] 

Luke Chen commented on KAFKA-12495:
---

[~ramkrish1489], thanks for your comments. However, this issue is still there 
after this patch. As I described, it's that the algorithm didn't consider the 
member count not the same as previous revocation round. It's not related to the 
issue in KAFKA-10413. 

Or we can say, you found 2 issues in KAFKA-10413, and I found one more issue in 
this ticket, which all will cause uneven distribution.

Please help review my PR after I completed it. Thank you.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W3 left after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> // W3 is down
> W3 doesn't join
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> which cause unbalanced distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
> {code}
> We cannot assume the member count after keeps the same right after revocation.
>  
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303906#comment-17303906
 ] 

Luke Chen edited comment on KAFKA-12495 at 3/18/21, 7:36 AM:
-

[~ramkrish1489], thanks for your comments. However, this issue is still there 
after this patch. As I described, it's that the algorithm didn't consider the 
member count not the same as previous revocation round. We just evenly 
distributed the revoked C/T to all the new members. We should consider the 
member count before doing this. It's not related to the issue in KAFKA-10413.

Or we can say, you found 2 issues in KAFKA-10413, and I found one more issue in 
this ticket, which all will cause uneven distribution.

Please help review my PR after I completed it. Thank you.


was (Author: showuon):
[~ramkrish1489], thanks for your comments. However, this issue is still there 
after this patch. As I described, it's that the algorithm didn't consider the 
member count not the same as previous revocation round. It's not related to the 
issue in KAFKA-10413. 

Or we can say, you found 2 issues in KAFKA-10413, and I found one more issue in 
this ticket, which all will cause uneven distribution.

Please help review my PR after I completed it. Thank you.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W3 left after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> // W3 is down
> W3 doesn't join
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> which cause unbalanced distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
> {code}
> We cannot assume the member count after keeps the same right after revocation.
>  
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10304: KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-18 Thread GitBox


chia7712 commented on pull request #10304:
URL: https://github.com/apache/kafka/pull/10304#issuecomment-801702252


   > LGTM. @chia7712 If you are fine with the PR, I let you merge it.
   
   sure. will merge it after I take a final review :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #10304: KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-18 Thread GitBox


chia7712 merged pull request #10304:
URL: https://github.com/apache/kafka/pull/10304


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12454) Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster

2021-03-18 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12454.

Resolution: Fixed

> Add ERROR logging on kafka-log-dirs when given brokerIds do not  exist in 
> current kafka cluster
> ---
>
> Key: KAFKA-12454
> URL: https://issues.apache.org/jira/browse/KAFKA-12454
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Minor
> Fix For: 3.0.0
>
>
> When non-existent brokerIds value are given, the kafka-log-dirs tool will 
> have a timeout error:
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment. Call: describeLogDirs
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50)
>  at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36)
>  at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment. Call: describeLogDirs
>  
> When the brokerId entered by the user does not exist, an error message 
> indicating that the node is not present should be printed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] unlizhao opened a new pull request #10348: KafkaAdminClient check null for group.groupState()

2021-03-18 Thread GitBox


unlizhao opened a new pull request #10348:
URL: https://github.com/apache/kafka/pull/10348


   1. when group.groupState() == null , the original writing method will throw 
a java.lang.NullPointerException, so the following is judged group.groupState 
   
   2. the caller of the equal method is flipped.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10348: MINOR: KafkaAdminClient check null for group.groupState()

2021-03-18 Thread GitBox


dajac commented on pull request #10348:
URL: https://github.com/apache/kafka/pull/10348#issuecomment-801792638


   @unlizhao Thanks for the PR. I wonder if `group.groupState()` could really 
be `null` as it seems to be empty string by default. Have you encountered a 
case where it happened? If so, it might be worth filling a bug for it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup

2021-03-18 Thread Andrey (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey updated KAFKA-12390:
---
Attachment: (was: negative lag kafka.png)

> error when storing group assignment during SyncGroup 
> -
>
> Key: KAFKA-12390
> URL: https://issues.apache.org/jira/browse/KAFKA-12390
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 2.4.0
>Reporter: Andrey
>Priority: Major
>
> There is a cluster consisting of 3 nodes
> Apache Kafka Cluster 2.4.0
> messages were observed in the logs of one of the nodes:
> Preparing to rebalance group corecft-adapter2 in state PreparingRebalance 
> with old generation 5043 (__consumer_offsets-22) (reason: error when storing 
> group assignment during SyncGroup (member: 
> consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. 
> coordinator.group.GroupCoordinator)
> Messages from the topic were read, but no commit occurred. then there was 
> balancing and the data in the topics was "doubled". There was also a 
> "negative" lag
> there are no error messages in the zookeeper logs
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup

2021-03-18 Thread Andrey (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey updated KAFKA-12390:
---
Attachment: negative lag kafka.png

> error when storing group assignment during SyncGroup 
> -
>
> Key: KAFKA-12390
> URL: https://issues.apache.org/jira/browse/KAFKA-12390
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 2.4.0
>Reporter: Andrey
>Priority: Major
> Attachments: negative lag kafka.png
>
>
> There is a cluster consisting of 3 nodes
> Apache Kafka Cluster 2.4.0
> messages were observed in the logs of one of the nodes:
> Preparing to rebalance group corecft-adapter2 in state PreparingRebalance 
> with old generation 5043 (__consumer_offsets-22) (reason: error when storing 
> group assignment during SyncGroup (member: 
> consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. 
> coordinator.group.GroupCoordinator)
> Messages from the topic were read, but no commit occurred. then there was 
> balancing and the data in the topics was "doubled". There was also a 
> "negative" lag
> there are no error messages in the zookeeper logs
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup

2021-03-18 Thread Andrey (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey updated KAFKA-12390:
---
Attachment: negative lag kafka.png

> error when storing group assignment during SyncGroup 
> -
>
> Key: KAFKA-12390
> URL: https://issues.apache.org/jira/browse/KAFKA-12390
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 2.4.0
>Reporter: Andrey
>Priority: Major
> Attachments: negative lag kafka.png
>
>
> There is a cluster consisting of 3 nodes
> Apache Kafka Cluster 2.4.0
> messages were observed in the logs of one of the nodes:
> Preparing to rebalance group corecft-adapter2 in state PreparingRebalance 
> with old generation 5043 (__consumer_offsets-22) (reason: error when storing 
> group assignment during SyncGroup (member: 
> consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. 
> coordinator.group.GroupCoordinator)
> Messages from the topic were read, but no commit occurred. then there was 
> balancing and the data in the topics was "doubled". There was also a 
> "negative" lag
> there are no error messages in the zookeeper logs
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12390) error when storing group assignment during SyncGroup

2021-03-18 Thread Andrey (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey updated KAFKA-12390:
---
Attachment: (was: negative lag kafka.png)

> error when storing group assignment during SyncGroup 
> -
>
> Key: KAFKA-12390
> URL: https://issues.apache.org/jira/browse/KAFKA-12390
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 2.4.0
>Reporter: Andrey
>Priority: Major
> Attachments: negative lag kafka.png
>
>
> There is a cluster consisting of 3 nodes
> Apache Kafka Cluster 2.4.0
> messages were observed in the logs of one of the nodes:
> Preparing to rebalance group corecft-adapter2 in state PreparingRebalance 
> with old generation 5043 (__consumer_offsets-22) (reason: error when storing 
> group assignment during SyncGroup (member: 
> consumer-5-5f3f994d-7c7c-43d6-8e0b-e011bdc2f9ba)) (kafka. 
> coordinator.group.GroupCoordinator)
> Messages from the topic were read, but no commit occurred. then there was 
> balancing and the data in the topics was "doubled". There was also a 
> "negative" lag
> there are no error messages in the zookeeper logs
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


cadonna commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596720028



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
-return true;
-} else if (lockAndOwner != null) {
-// another thread owns the lock
-return false;
-}
-
-try {
-lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-} catch (final ProcessorStateException e) {
-// directoryForTask could be throwing an exception if another 
thread
-// has concurrently deleted the directory
-return false;
-}
-
-final FileChannel channel;
-
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
-} catch (final NoSuchFileException e) {
-// FileChannel.open(..) could throw NoSuchFileException when there 
is another thread
-// concurrently deleting the parent directory (i.e. the directory 
of the taskId) of the lock
-// file, in this case we will return immediately indicating 
locking failed.
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {
+log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+// we already own the lock
+return true;
+} else {
+// another thread owns the lock
+return false;
+}
+} else if (!stateDir.exists()) {

Review comment:
   I am wondering if we should throw an `IllegalStateException` here, 
because it seems illegal to me to request a lock of a task directory in a state 
directory that does not exist. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
 exception
 );
 } finally {
-try {
-unlock(id);
-} catch (final IOException exception) {
-log.warn(
-String.format("%s Swallowed the following 
exception during unlocking after deletion of obsolete " +
-"state directory %s for task %s:", 
logPrefix(), dirName, id),
-exception
-);
-}
+unlock(id);

Review comment:
   I guess you could make the same change on line 474 in 
`cleanRemovedTasksCalledByUser()`, couldn't you?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##
@@ -240,32 +192,30 @@ public void 
testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOEx
 }
 
 @Test
-public void testCloseStateManagerThrowsExceptionWhenDirty() throws 
IOException {
+public void testCloseStateManagerThrowsExceptionWhenDirty() {
 expect(stateManager.taskId()).andReturn(taskId);
 
 expect(stateDirectory.lock(taskId)).andReturn(true);
 
 stateManager.close();
-expectLastCall();
+expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
 
 stateDirectory.unlock(taskId);
-expectLastCall().andThrow(new IOException("Timeout"));
+expectLastCall();

Review comment:
   You can probably remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10347: MINOR: the request thread in clientToControllerChannelManager is NOT…

2021-03-18 Thread GitBox


dengziming commented on pull request #10347:
URL: https://github.com/apache/kafka/pull/10347#issuecomment-801824522


   It seems that this issue has been fixed by #10340.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12496) Javadoc webpages should not be published to mirrors

2021-03-18 Thread Sebb (Jira)
Sebb created KAFKA-12496:


 Summary: Javadoc webpages should not be published to mirrors
 Key: KAFKA-12496
 URL: https://issues.apache.org/jira/browse/KAFKA-12496
 Project: Kafka
  Issue Type: Task
Reporter: Sebb


As per discussion on the dev mailing list, please would someone remove the 
javadocs from the mirrors, i.e.

svn rm https://dist.apache.org/repos/dist/release/kafka/2.6.1/javadoc/
and
svn rm https://dist.apache.org/repos/dist/release/kafka/2.7.0/javadoc/




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10347: MINOR: the request thread in clientToControllerChannelManager is NOT…

2021-03-18 Thread GitBox


chia7712 commented on pull request #10347:
URL: https://github.com/apache/kafka/pull/10347#issuecomment-801827853


   > It seems that this issue has been fixed by #10340.
   
   You are right :)
   
   close this now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #10347: MINOR: the request thread in clientToControllerChannelManager is NOT…

2021-03-18 Thread GitBox


chia7712 closed pull request #10347:
URL: https://github.com/apache/kafka/pull/10347


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9779: KAFKA-10767: Adding test cases for all, reverseAll and reverseRange for ThreadCache

2021-03-18 Thread GitBox


cadonna commented on a change in pull request #9779:
URL: https://github.com/apache/kafka/pull/9779#discussion_r596741592



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
##
@@ -252,20 +262,43 @@ public void shouldGetSameKeyAsPeekNext() {
 assertEquals(iterator.peekNextKey(), iterator.next().key);
 }
 
+@Test
+public void shouldGetSameKeyAsPeekNextReverseRange() {
+final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
+final Bytes theByte = Bytes.wrap(new byte[]{1});
+cache.put(namespace, theByte, dirtyEntry(theByte.get()));
+final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+assertEquals(iterator.peekNextKey(), iterator.next().key);

Review comment:
   nit: see my comment about `assertThat()` above. 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
##
@@ -252,20 +262,43 @@ public void shouldGetSameKeyAsPeekNext() {
 assertEquals(iterator.peekNextKey(), iterator.next().key);
 }
 
+@Test
+public void shouldGetSameKeyAsPeekNextReverseRange() {
+final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
+final Bytes theByte = Bytes.wrap(new byte[]{1});
+cache.put(namespace, theByte, dirtyEntry(theByte.get()));
+final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+assertEquals(iterator.peekNextKey(), iterator.next().key);
+}
+
 @Test
 public void shouldThrowIfNoPeekNextKey() {
 final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
 final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1}));
 assertThrows(NoSuchElementException.class, iterator::peekNextKey);
 }
 
+@Test
+public void shouldThrowIfNoPeekNextKeyReverseRange() {
+final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
+final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new 
byte[]{1}));
+assertThrows(NoSuchElementException.class, iterator::peekNextKey);
+}
+

Review comment:
   nit: could you try to deduplicate code here and in the other unit tests? 
Here for example, you could have one method like this:
   
   ```
   private void shouldThrowIfNoPeekNextKey(final 
Supplier methodUnderTest) {
   final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
methodUnderTest.get();
   assertThrows(NoSuchElementException.class, iterator::peekNextKey);
   }
   ```
   
   and then two public tests
   
   ```
   @Test
   public void shouldThrowIfNoPeekNextKeyRange() {
   final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
   shouldThrowIfNoPeekNextKey(() -> cache.range(namespace, 
Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
   }
   
   @Test
   public void shouldThrowIfNoPeekNextKeyReverseRange() {
   final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
   shouldThrowIfNoPeekNextKey(() -> cache.reverseRange(namespace, 
Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})));
   }
   ```
   
   Admittedly, in this specific case, we would not win much but for other unit 
tests in this test class it may be worth. Try and then decide if it is worth or 
not.  

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
##
@@ -243,6 +243,16 @@ public void shouldPeekNextKey() {
 assertEquals(theByte, iterator.peekNextKey());
 }
 
+@Test
+public void shouldPeekNextKeyReverseRange() {
+final ThreadCache cache = new ThreadCache(logContext, 1L, new 
MockStreamsMetrics(new Metrics()));
+final Bytes theByte = Bytes.wrap(new byte[]{1});
+cache.put(namespace, theByte, dirtyEntry(theByte.get()));
+final ThreadCache.MemoryLRUCacheBytesIterator iterator = 
cache.reverseRange(namespace, Bytes.wrap(new byte[]{0}), theByte);
+assertEquals(theByte, iterator.peekNextKey());
+assertEquals(theByte, iterator.peekNextKey());

Review comment:
   nit: Could you please use `assertThat(iterator.peekNextKey(), is(the 
Byte))` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.


[jira] [Commented] (KAFKA-12384) Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch

2021-03-18 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304053#comment-17304053
 ] 

Bruno Cadonna commented on KAFKA-12384:
---

Failed again 

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10317/2/testReport/junit/kafka.server/ListOffsetsRequestTest/Build___JDK_15___testResponseIncludesLeaderEpoch___2/

{code}
org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: <(-1,-1)>
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at 
org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124)
at 
kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172)
{code}

> Flaky Test ListOffsetsRequestTest.testResponseIncludesLeaderEpoch
> -
>
> Key: KAFKA-12384
> URL: https://issues.apache.org/jira/browse/KAFKA-12384
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> {quote}org.opentest4j.AssertionFailedError: expected: <(0,0)> but was: 
> <(-1,-1)> at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at 
> org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at 
> org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at 
> kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch(ListOffsetsRequestTest.scala:172){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API

2021-03-18 Thread GitBox


dajac commented on a change in pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#discussion_r596869352



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+public interface AdminApiLookupStrategy {
+
+/**
+ * Define the scope of a given key for lookup. Key lookups are complicated
+ * by the need to accommodate different batching mechanics. For example,
+ * a `Metadata` request supports arbitrary batching of topic partitions in
+ * order to discover partitions leaders. This can be supported by returning
+ * a single scope object for all keys.
+ *
+ * On the other hand, `FindCoordinator` request only supports lookup of a
+ * single key. This can be supported by returning a different scope object
+ * for each lookup key.
+ *
+ * @param key the lookup key
+ *
+ * @return request scope indicating how lookup requests can be batched 
together
+ */
+RequestScope lookupScope(T key);
+
+/**
+ * Build the lookup request for a set of keys. The grouping of the keys is 
controlled
+ * through {@link #lookupScope(Object)}. In other words, each set of keys 
that map
+ * to the same request scope object will be sent to this method.
+ *
+ * @param keys the set of keys that require lookup
+ *
+ * @return a builder for the lookup request
+ */
+AbstractRequest.Builder buildRequest(Set keys);
+
+/**
+ * Callback that is invoked when a lookup request returns successfully. 
The handler
+ * should parse the response, check for errors, and return a result 
indicating
+ * which keys were mapped to a brokerId successfully and which keys 
received
+ * a fatal error (e.g. a topic authorization failure).
+ *
+ * Note that keys which receive a retriable error should be left out of the
+ * result. They will be retried automatically. For example, if the 
response of
+ * `FindCoordinator` request indicates an unavailable coordinator, then 
the key
+ * should be left out of the result so that the request will be retried.
+ *
+ * @param keys the set of keys from the associated request
+ * @param response the response received from the broker
+ *
+ * @return a result indicating which keys mapped successfully to a 
brokerId and
+ * which encountered a fatal error
+ */
+LookupResult handleResponse(Set keys, AbstractResponse response);
+
+class LookupResult {
+public final Map mappedKeys;
+public final Map failedKeys;
+
+public LookupResult(
+Map failedKeys,
+Map mappedKeys
+) {
+this.failedKeys = Collections.unmodifiableMap(failedKeys);
+this.mappedKeys = Collections.unmodifiableMap(mappedKeys);
+}
+}
+
+interface RequestScope {
+default OptionalInt destinationBrokerId() {
+return OptionalInt.empty();
+}
+}

Review comment:
   Have you decided to put this interface here because it is mainly used by 
the `AdminApiLookupStrategy`? Intuitively, I would have placed it in the driver 
because it is used by few other classes as well.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unles

[GitHub] [kafka] ijuma commented on a change in pull request #10344: MINOR: Remove use of NoSuchElementException

2021-03-18 Thread GitBox


ijuma commented on a change in pull request #10344:
URL: https://github.com/apache/kafka/pull/10344#discussion_r596895503



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -37,8 +37,9 @@ final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the 
snapshotting thread to notify the
-  // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  // polling thread when snapshots are created. Using a Map instead of a Set 
so that there is no
+  // need to handle NoSuchElementException.
+  snapshotIds: ConcurrentSkipListMap[OffsetAndEpoch, Unit],

Review comment:
   @jsancio Iterators need to provide consistent behavior or they would not 
satisfy the Iterator contract. What I mean is: if `hasNext` returns `true`, 
then `next` has to return an element. It would not be ok for `next` to do 
something different in that case. So, it seems safe to use here. In any case, 
not a big deal to use `ConcurrentNavigableMap` since that's what 
`ConcurrentSkipListSet` uses too. But if we do that, I would suggest creating a 
simple wrapper that exposes the methods you want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10341: KAFKA-12491: Make rocksdb an `api` dependency for `streams`

2021-03-18 Thread GitBox


ijuma merged pull request #10341:
URL: https://github.com/apache/kafka/pull/10341


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10349: MINOR: Move `configurations.all` to be a child of `allprojects`

2021-03-18 Thread GitBox


ijuma opened a new pull request #10349:
URL: https://github.com/apache/kafka/pull/10349


   It was incorrectly set within `dependencyUpdates` and it still worked.
   That is, this is a no-op in terms of behavior, but makes it easier to
   read and understand.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

2021-03-18 Thread GitBox


ijuma commented on pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#issuecomment-801967783


   Is this a 2.8 blocker?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] unlizhao commented on pull request #10348: MINOR: KafkaAdminClient check null for group.groupState()

2021-03-18 Thread GitBox


unlizhao commented on pull request #10348:
URL: https://github.com/apache/kafka/pull/10348#issuecomment-801972726


   @dajac Normally, this value(`group.groupState()`) is the network request 
processed by the customer in the cluster when sending the consumergroups 
message. Normally, the response will carry this value.
   
   If this value is not set, `group.groupStat ()` will be a null. This way of 
writing `group.groupState().Equals (‘’)` will report null pointer exception.
   
   You can try to do this and you'll see this.` org.apache.kafka . 
clients.admin.KafkaAdminClientTest#testListConsumerGroupsWithStates`
   
   `Change to setgroupstate ("stable") ==> setgroupstate (null)`
   
   However, according to the normal design, the `response` without `groupstate` 
itself is illegal, and it is reasonable not to handle this response.
   
   Even if NPE occurs, this method is in the outer try catch and will not cause 
system crash.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-03-18 Thread GitBox


ijuma merged pull request #10056:
URL: https://github.com/apache/kafka/pull/10056


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] unlizhao edited a comment on pull request #10348: MINOR: KafkaAdminClient check null for group.groupState()

2021-03-18 Thread GitBox


unlizhao edited a comment on pull request #10348:
URL: https://github.com/apache/kafka/pull/10348#issuecomment-801972726


   @dajac  Thanks for your reviewed.  
   
   Normally, this value(`group.groupState()`) is the network request processed 
by the customer in the cluster when sending the consumergroups message. 
Normally, the response will carry this value.
   
   If this value is not set, `group.groupStat ()` will be a null. This way of 
writing `group.groupState().Equals (‘’)` will report null pointer exception.
   
   You can try to do this and you'll see this.` org.apache.kafka . 
clients.admin.KafkaAdminClientTest#testListConsumerGroupsWithStates`
   
   `Change to setgroupstate ("stable") ==> setgroupstate (null)`
   
   However, according to the normal design, the `response` without `groupstate` 
itself is illegal, and it is reasonable not to handle this response.
   
   Even if NPE occurs, this method is in the outer try catch and will not cause 
system crash.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-03-18 Thread GitBox


ijuma commented on pull request #10056:
URL: https://github.com/apache/kafka/pull/10056#issuecomment-801976406


   I tried backporting to 2.8 and the older plugin versions in that branch were 
not available in Maven central. We should probably wait a few months and see 
how things develop before backporting to older branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10056: KAFKA-12293: remove JCenter and Bintray repositories mentions out of build gradle scripts (sunset is announced for those repositories)

2021-03-18 Thread GitBox


ijuma edited a comment on pull request #10056:
URL: https://github.com/apache/kafka/pull/10056#issuecomment-801976406


   I tried backporting to 2.8 and the older Gradle plugin versions in that 
branch were not available in Maven central. We should probably wait a few 
months and see how things develop before backporting to older branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

2021-03-18 Thread GitBox


mumrah commented on pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#issuecomment-801992943


   @ijuma no this only affects trunk. It looks like the bug was introduced only 
a few days ago


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

2021-03-18 Thread GitBox


ijuma commented on pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#issuecomment-801997350


   Thanks @mumrah.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10344: MINOR: Remove use of NoSuchElementException

2021-03-18 Thread GitBox


jsancio commented on a change in pull request #10344:
URL: https://github.com/apache/kafka/pull/10344#discussion_r596980629



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -37,8 +37,9 @@ final class KafkaMetadataLog private (
   log: Log,
   scheduler: Scheduler,
   // This object needs to be thread-safe because it is used by the 
snapshotting thread to notify the
-  // polling thread when snapshots are created.
-  snapshotIds: ConcurrentSkipListSet[OffsetAndEpoch],
+  // polling thread when snapshots are created. Using a Map instead of a Set 
so that there is no
+  // need to handle NoSuchElementException.
+  snapshotIds: ConcurrentSkipListMap[OffsetAndEpoch, Unit],

Review comment:
   Sounds good @ijuma . I changed the code to use a 
`ConcurrentSkipListSet`, ascending iterator and descending iterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-3813.
--
Resolution: Done

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10314: MINOR: remove redundant null check when testing specified type

2021-03-18 Thread GitBox


chia7712 merged pull request #10314:
URL: https://github.com/apache/kafka/pull/10314


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


jsancio commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r596994002



##
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-final private Type type;
+final private Kind kind;

Review comment:
   Yeah. What makes the suggested design pattern nice in Scala is that the 
companion objects for `case class` have an `unapply` method for pattern 
matching.
   
   For what it is worth. I thought about this design when implementing 
`ValidOffsetAndEpoch` but found the current design more user friendly in Java.
   
   I am all for changing the name of `ValidOffsetAndEpoch`. I was never happy 
with the name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10085: KAFKA-12154: Snapshot Loading API

2021-03-18 Thread GitBox


jsancio commented on a change in pull request #10085:
URL: https://github.com/apache/kafka/pull/10085#discussion_r596996121



##
File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotReader.java
##
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.snapshot;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RecordSerde;
+
+public final class SnapshotReader implements Closeable, Iterable> {

Review comment:
   Fair enough. Let me revisit this and see what code we can reuse.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12497) Source task offset commits continue even after task has failed

2021-03-18 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-12497:
-

 Summary: Source task offset commits continue even after task has 
failed
 Key: KAFKA-12497
 URL: https://issues.apache.org/jira/browse/KAFKA-12497
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.0.0, 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.2, 2.5.2, 2.8.0, 
2.7.1, 2.6.2
Reporter: Chris Egerton
Assignee: Chris Egerton


Source task offset commits take place on a dedicated thread, which periodically 
triggers offset commits for all of the source tasks on the worker on a 
user-configurable interval and with a user-configurable timeout for each offset 
commit.

 

When a task fails, offset commits continue to take place. In the common case 
where there is no longer any chance for another successful offset commit for 
the task, this has two negative side-effects:

First, confusing log messages are emitted that some users reasonably interpret 
as a sign that the source task is still alive:
{noformat}
[2021-03-06 04:30:53,739] INFO 
WorkerSourceTask{id=Salesforce_PC_Connector_Agency-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSourceTask)
25[2021-03-06 04:30:53,739] INFO 
WorkerSourceTask{id=Salesforce_PC_Connector_Agency-0} flushing 0 outstanding 
messages for offset commit 
(org.apache.kafka.connect.runtime.WorkerSourceTask){noformat}
Second, if the task has any source records pending, it will block the shared 
offset commit thread until the offset commit timeout expires. This will take 
place repeatedly until the either the task is restarted/deleted, or all of 
these records are flushed.

 

In some other cases, it's actually somewhat sensible to continue to try to 
commit offsets. Even if a source task has died, data from it may still be in 
flight to the broker, and there's no reason not to commit the offsets for that 
data once it has been ack'd.

 

However, if there is no in-flight data from a source task that is pending an 
ack from the Kafka cluster, and the task has failed, there is no reason to 
continue to try to commit offsets. Additionally, if the producer has failed to 
send a record to Kafka with a non-retriable exception, there is also no reason 
to continue to try to commit offsets, as the current batch will never complete.

 

We can address one or both of these cases to try to reduce the number of 
confusing logging messages, and if necessary, alter existing log messages to 
make it clear to the user that the task may not be alive.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

2021-03-18 Thread GitBox


tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r597008060



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -40,42 +42,42 @@
 public void produceResponseV5Test() {
 Map responseData = 
new HashMap<>();
 TopicPartition tp0 = new TopicPartition("test", 0);
-responseData.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE,
-1, RecordBatch.NO_TIMESTAMP, 100));
+responseData.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 1, RecordBatch.NO_TIMESTAMP, 
100));
 
 ProduceResponse v5Response = new ProduceResponse(responseData, 10);
 short version = 5;
 
 ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
 ResponseHeader.parse(buffer, 
ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-ProduceResponse v5FromBytes = (ProduceResponse) 
AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-buffer, version);
-
-assertEquals(1, v5FromBytes.responses().size());
-assertTrue(v5FromBytes.responses().containsKey(tp0));
-ProduceResponse.PartitionResponse partitionResponse = 
v5FromBytes.responses().get(tp0);
-assertEquals(100, partitionResponse.logStartOffset);
-assertEquals(1, partitionResponse.baseOffset);
-assertEquals(10, v5FromBytes.throttleTimeMs());
-assertEquals(responseData, v5Response.responses());
+ProduceResponse v5FromBytes = (ProduceResponse) 
AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+assertEquals(1, v5FromBytes.data().responses().size());
+ProduceResponseData.TopicProduceResponse topicProduceResponse = 
v5FromBytes.data().responses().iterator().next();
+assertEquals(1, topicProduceResponse.partitionResponses().size());  
+ProduceResponseData.PartitionProduceResponse partitionProduceResponse 
= topicProduceResponse.partitionResponses().iterator().next();
+TopicPartition tp = new TopicPartition(topicProduceResponse.name(), 
partitionProduceResponse.index());
+assertEquals(tp0, tp);
+
+assertEquals(100, partitionProduceResponse.logStartOffset());
+assertEquals(1, partitionProduceResponse.baseOffset());
+assertEquals(RecordBatch.NO_TIMESTAMP, 
partitionProduceResponse.logAppendTimeMs());
+assertEquals(Errors.NONE, 
Errors.forCode(partitionProduceResponse.errorCode()));
+assertEquals(null, partitionProduceResponse.errorMessage());

Review comment:
   Done.

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -40,42 +42,42 @@
 public void produceResponseV5Test() {
 Map responseData = 
new HashMap<>();
 TopicPartition tp0 = new TopicPartition("test", 0);
-responseData.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE,
-1, RecordBatch.NO_TIMESTAMP, 100));
+responseData.put(tp0, new 
ProduceResponse.PartitionResponse(Errors.NONE, 1, RecordBatch.NO_TIMESTAMP, 
100));
 
 ProduceResponse v5Response = new ProduceResponse(responseData, 10);
 short version = 5;
 
 ByteBuffer buffer = 
RequestTestUtils.serializeResponseWithHeader(v5Response, version, 0);
 
 ResponseHeader.parse(buffer, 
ApiKeys.PRODUCE.responseHeaderVersion(version)); // throw away.
-ProduceResponse v5FromBytes = (ProduceResponse) 
AbstractResponse.parseResponse(ApiKeys.PRODUCE,
-buffer, version);
-
-assertEquals(1, v5FromBytes.responses().size());
-assertTrue(v5FromBytes.responses().containsKey(tp0));
-ProduceResponse.PartitionResponse partitionResponse = 
v5FromBytes.responses().get(tp0);
-assertEquals(100, partitionResponse.logStartOffset);
-assertEquals(1, partitionResponse.baseOffset);
-assertEquals(10, v5FromBytes.throttleTimeMs());
-assertEquals(responseData, v5Response.responses());
+ProduceResponse v5FromBytes = (ProduceResponse) 
AbstractResponse.parseResponse(ApiKeys.PRODUCE, buffer, version);
+
+assertEquals(1, v5FromBytes.data().responses().size());
+ProduceResponseData.TopicProduceResponse topicProduceResponse = 
v5FromBytes.data().responses().iterator().next();
+assertEquals(1, topicProduceResponse.partitionResponses().size());  
+ProduceResponseData.PartitionProduceResponse partitionProduceResponse 
= topicProduceResponse.partitionResponses().iterator().next();
+TopicPartition tp = new TopicPartition(topicProduceResponse.name(), 
partitionProduceResponse.index());
+assertEquals(tp0, tp);
+
+assertEquals(100, partitionProduceResponse.logStartOffset());
+assertEquals(1, partitionProduceResponse.baseOffset());
+

[GitHub] [kafka] rhauch commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-03-18 Thread GitBox


rhauch commented on a change in pull request #10315:
URL: https://github.com/apache/kafka/pull/10315#discussion_r597007027



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+CooperativeStickyAssignor.class.getName() + "," + 
RangeAssignor.class.getName());

Review comment:
   As discussed on the [Jira 
issue](https://issues.apache.org/jira/browse/KAFKA-12463), I think you're 
suggesting that we use `RoundRobinAssignor` instead of 
`CooperativeStickyAssignor` due to the issue you found with Connect using the 
latter ([KAFKA-12487](https://issues.apache.org/jira/browse/KAFKA-12487)).
   
   Also, it's probably worthwhile to add a comment here about why we're using 
_two_ assignors rather than _only_ the cooperative (or round robin) assignor. 
Maybe something like:
   ```
   // Prefer the cooperative assignor, but allow old range assignor during 
rolling upgrades
   // from Connect versions that just used the range assignor (the default)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304248#comment-17304248
 ] 

Randall Hauch commented on KAFKA-12463:
---

Thanks for the response and logging KAFKA-12487, [~ChrisEgerton]. 

{quote}

I'd also just like to point out that the goal here is to improve the 
out-of-the-box behavior of Connect for users; although workarounds are nice to 
have, the goal here shouldn't be to focus on documenting them but instead, to 
make them obsolete. If we decide not to improve the default behavior of Connect 
then we can document this somewhere else that's a little more visible for users 
as opposed to developers. 

{quote}

I agree with you that we should fix the behavior. But the fix will appear only 
in certain releases, and not all users will be able to upgrade to those 
releases to get the fix. So, documenting the simple workaround will help users 
in those situations. I suggested documenting the workaround here to help any 
users that do stumble upon this issue when searching for a potential fix, 
regardless of whether the workaround is also documented elsewhere.

IIUC, given KAFKA-12487 is likely more challenging to fix and has not yet been 
addressed, the shorter term proposal here is to change the worker to set the 
following on consumer configs used in sink connectors:
{code:java}
 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RangeAssignor{code}
This works because the round robin will be used only after all workers have 
been upgraded, and this gives us more balanced consumer assignments. Plus, it 
is backward compatible since the worker will always override this new value 
should users have any worker configs that override this property via:
{code:java}
consumer.partition.assignment.strategy=... {code}
or have any connector configs that use client overrides via:
{code:java}
consumer.overrides.partition.assignment.strategy=...{code}
 

If that is the proposal, WDYT about updating the description to make this more 
clear? Essentially, I suggest this issue's description would state the problem 
(that section is good), propose a solution using round robin (mostly using your 
proposed section to use round robin rather than cooperative), document the 
workaround, and finally address why {{RoundRobitAssignor}} was used instead of 
{{CooperativeStickyAssignor}}. 

 

And if we're on the same page, then I think it's worth updating the PR to 
implement the proposed fix. I'll state the same on a review for the PR.

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 s

[GitHub] [kafka] chia7712 commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

2021-03-18 Thread GitBox


chia7712 commented on a change in pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#discussion_r597014831



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -295,6 +295,9 @@ class BrokerToControllerRequestThread(
   private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
 
+  // Used for testing
+  private[server] var started = false

Review comment:
   Does it need to be thread-safe?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304254#comment-17304254
 ] 

Randall Hauch commented on KAFKA-10340:
---

{quote}

I wonder if this we should actually resolve this JIRA as the fix has been made 
to trunk, 2.6 and 2.7. If John decides to pull it in 2.8, then we can just add 
2.8.0 to Fix Versions. That would enable us to run the 2.6.2 and 2.7.1 releases 
without having to tweak Fix Versions.

{quote}

That sounds fine with me. I'll will add the 2.6.2 fix, mark this as resolved. 
We can then add the 2.8 version (either `2.8.0` or `2.8.1`) if/when 
[https://github.com/apache/kafka/pull/10238] is merged to 2.8.

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vanhoale commented on pull request #10239: KAFKA-12372: Enhance TimestampCoverter Connect transformation to handle multiple timestamp or date fields

2021-03-18 Thread GitBox


vanhoale commented on pull request #10239:
URL: https://github.com/apache/kafka/pull/10239#issuecomment-802059907


   hi @kkonstantine, should one of your team members review the PR? we want to 
use this feature ASAP 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

2021-03-18 Thread GitBox


chia7712 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r597031230



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -78,6 +80,20 @@ public void produceResponseVersionTest() {
 assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be 
zero");
 assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 
10");
 assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 
10");
+
+List arrResponse = Arrays.asList(v0Response, 
v1Response, v2Response);
+for(ProduceResponse produceResponse:arrResponse) {

Review comment:
   code style: `produceResponse : arrResponse`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-03-18 Thread GitBox


C0urante commented on a change in pull request #10315:
URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+CooperativeStickyAssignor.class.getName() + "," + 
RangeAssignor.class.getName());

Review comment:
   Yeah, a comment is definitely warranted, good call.
   
   We might use the `RoundRobinAssignor` but don't think we have to settle for 
that just yet. There's a couple of bugs/improvements in the way, but at least 
one of them is being actively worked on 
(https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the 
other.
   
   I converted this to a draft since it's definitely not ready for merge yet 
and it's unclear which path forward we might want to take. We might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step).
   
   I'll try to do an analysis of the pros/cons of the 
`CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink 
connectors on the Jira ticket; I'm actually leaning a tiny bit toward the 
`RoundRobinAssignor` but I might be suffering from tunnel vision due to recent 
misadventures with other assignors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-03-18 Thread GitBox


C0urante commented on a change in pull request #10315:
URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+CooperativeStickyAssignor.class.getName() + "," + 
RangeAssignor.class.getName());

Review comment:
   Yeah, a comment is definitely warranted, good call. 👌 
   
   We might use the `RoundRobinAssignor` but don't think we have to settle for 
that just yet. There's a couple of bugs/improvements in the way, but at least 
one of them is being actively worked on 
(https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the 
other.
   
   I converted this to a draft since it's definitely not ready for merge yet 
and it's unclear which path forward we might want to take. We might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step).
   
   I'll try to do an analysis of the pros/cons of the 
`CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink 
connectors on the Jira ticket; I'm actually leaning a tiny bit toward the 
`RoundRobinAssignor` but I might be suffering from tunnel vision due to recent 
misadventures with other assignors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-03-18 Thread GitBox


C0urante commented on a change in pull request #10315:
URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+CooperativeStickyAssignor.class.getName() + "," + 
RangeAssignor.class.getName());

Review comment:
   Yeah, a comment is definitely warranted, good call. 👌 
   
   We might use the `RoundRobinAssignor`, but I don't think we have to settle 
for that just yet. There's a couple of bugs/improvements in the way, but at 
least one of them is being actively worked on 
(https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the 
other (https://issues.apache.org/jira/browse/KAFKA-12487).
   
   I converted this to a draft since it's definitely not ready for merge yet 
and it's unclear which path forward we might want to take. We might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step).
   
   I'll try to do an analysis of the pros/cons of the 
`CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink 
connectors on the Jira ticket; I'm actually leaning a tiny bit toward the 
`RoundRobinAssignor` but I might be suffering from tunnel vision due to recent 
misadventures with other assignors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10315: KAFKA-12463: Update default sink task partition assignor to cooperative sticky assignor

2021-03-18 Thread GitBox


C0urante commented on a change in pull request #10315:
URL: https://github.com/apache/kafka/pull/10315#discussion_r597034374



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -698,6 +700,8 @@ private WorkerTask buildWorkerTask(ClusterConfigState 
configState,
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+CooperativeStickyAssignor.class.getName() + "," + 
RangeAssignor.class.getName());

Review comment:
   Yeah, a comment is definitely warranted, good call. 👌 
   
   We might use the `RoundRobinAssignor`, but I don't think we have to settle 
for that just yet. There's a couple of bugs/improvements in the way, but at 
least one of them is being actively worked on 
(https://github.com/apache/kafka/pull/10345) and I'm happy to take care of the 
other ([KAFKA-12487](https://issues.apache.org/jira/browse/KAFKA-12487)).
   
   I converted this to a draft since it's definitely not ready for merge yet 
and it's unclear which path forward we might want to take. We might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step).
   
   I'll try to do an analysis of the pros/cons of the 
`CooperativeStickyAssignor` vs the `RoundRobinAssignor` with regards to sink 
connectors on the Jira ticket; I'm actually leaning a tiny bit toward the 
`RoundRobinAssignor` but I might be suffering from tunnel vision due to recent 
misadventures with other assignors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-18 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10340:
--
Fix Version/s: 2.6.2

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10340) Source connectors should report error when trying to produce records to non-existent topics instead of hanging forever

2021-03-18 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-10340.
---
  Reviewer: Randall Hauch
Resolution: Fixed

> Source connectors should report error when trying to produce records to 
> non-existent topics instead of hanging forever
> --
>
> Key: KAFKA-10340
> URL: https://issues.apache.org/jira/browse/KAFKA-10340
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 2.7.0, 2.6.1, 2.8.0
>Reporter: Arjun Satish
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.0.0, 2.7.1, 2.6.2
>
>
> Currently, a source connector will blindly attempt to write a record to a 
> Kafka topic. When the topic does not exist, its creation is controlled by the 
> {{auto.create.topics.enable}} config on the brokers. When auto.create is 
> disabled, the producer.send() call on the Connect worker will hang 
> indefinitely (due to the "infinite retries" configuration for said producer). 
> In setups where this config is usually disabled, the source connector simply 
> appears to hang and not produce any output.
> It is desirable to either log an info or an error message (or inform the user 
> somehow) that the connector is simply stuck waiting for the destination topic 
> to be created. When the worker has permissions to inspect the broker 
> settings, it can use the {{listTopics}} and {{describeConfigs}} API in 
> AdminClient to check if the topic exists, the broker can 
> {{auto.create.topics.enable}} topics, and if these cases do not exist, either 
> throw an error.
> With the recently merged 
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics],
>  this becomes even more specific a corner case: when topic creation settings 
> are enabled, the worker should handle the corner case where topic creation is 
> disabled, {{auto.create.topics.enable}} is set to false and topic does not 
> exist.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #10014: KAFKA-12252 and KAFKA-12262: Fix session key rotation when leadership changes

2021-03-18 Thread GitBox


rhauch commented on a change in pull request #10014:
URL: https://github.com/apache/kafka/pull/10014#discussion_r597039806



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -402,7 +402,7 @@ public void tick() {
 log.debug("Scheduled rebalance at: {} (now: {} 
nextRequestTimeoutMs: {}) ",
 scheduledRebalance, now, nextRequestTimeoutMs);
 }
-if (internalRequestValidationEnabled() && keyExpiration < 
Long.MAX_VALUE) {
+if (isLeader() && internalRequestValidationEnabled() && keyExpiration 
< Long.MAX_VALUE) {

Review comment:
   Sounds good. Thanks for logging 
[KAFKA-12474](https://issues.apache.org/jira/browse/KAFKA-12474) and 
[KAFKA-12476](https://issues.apache.org/jira/browse/KAFKA-12476).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on a change in pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

2021-03-18 Thread GitBox


tang7526 commented on a change in pull request #10332:
URL: https://github.com/apache/kafka/pull/10332#discussion_r597042561



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java
##
@@ -78,6 +80,20 @@ public void produceResponseVersionTest() {
 assertEquals(0, v0Response.throttleTimeMs(), "Throttle time must be 
zero");
 assertEquals(10, v1Response.throttleTimeMs(), "Throttle time must be 
10");
 assertEquals(10, v2Response.throttleTimeMs(), "Throttle time must be 
10");
+
+List arrResponse = Arrays.asList(v0Response, 
v1Response, v2Response);
+for(ProduceResponse produceResponse:arrResponse) {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12262) New session keys are never distributed when follower with key becomes leader

2021-03-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304256#comment-17304256
 ] 

Randall Hauch commented on KAFKA-12262:
---

[https://github.com/apache/kafka/pull/10014] fixes this issue and KAFKA-12252.

> New session keys are never distributed when follower with key becomes leader
> 
>
> Key: KAFKA-12262
> URL: https://issues.apache.org/jira/browse/KAFKA-12262
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> The expiration time for session keys [starts at 
> {{Long.MAX_VALUE}}|https://github.com/apache/kafka/blob/1c00d9dfa50570b4554a123bfe91ccc5e9ad1ca9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L258]
>  and [is updated when a new session key is read if and only if the worker is 
> the 
> leader|https://github.com/apache/kafka/blob/1c00d9dfa50570b4554a123bfe91ccc5e9ad1ca9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1579-L1581].
>  If a follower reads a session key from the config topic, the expiration time 
> will remain {{Long.MAX_VALUE}} and, even if the worker becomes the leader, 
> will not be updated.
> Once this happens, all key rotation will cease until and unless the former 
> leader of the cluster becomes the leader again without being restarted in the 
> meantime, or all workers in the cluster are shut down at the same time and 
> then the cluster is brought back up again.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10077: KAFKA-12309 The revocation algorithm produces uneven distributions

2021-03-18 Thread GitBox


chia7712 commented on pull request #10077:
URL: https://github.com/apache/kafka/pull/10077#issuecomment-802076787


   @rhauch Could you take a look? this uneven distributions can be reproduced 
easily so it would be nice to fix it asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10335: KAFKA-12484: Enable Connect's connector log contexts by default (KIP-721)

2021-03-18 Thread GitBox


rhauch commented on pull request #10335:
URL: https://github.com/apache/kafka/pull/10335#issuecomment-802077441


   Note, if the upgrade of Log4J to Log4J2 (#7898) is merged prior to this, we 
should update this PR to also change to the new Log4J2 properties file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bsolomon1124 opened a new pull request #10350: [docs] Demo use of *-server-stop commands to stop

2021-03-18 Thread GitBox


bsolomon1124 opened a new pull request #10350:
URL: https://github.com/apache/kafka/pull/10350


   ...rather than CTRL+C.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12498) Consumer Group is zero when MirrorMaker 2.0 is running

2021-03-18 Thread Olumide Ajiboye (Jira)
Olumide Ajiboye created KAFKA-12498:
---

 Summary: Consumer Group is zero when MirrorMaker 2.0 is running
 Key: KAFKA-12498
 URL: https://issues.apache.org/jira/browse/KAFKA-12498
 Project: Kafka
  Issue Type: Bug
  Components: consumer, replication
Affects Versions: 2.7.0
 Environment: Kubernetes 1.19.7
Strimzi 0.21.1

Reporter: Olumide Ajiboye


I have two Kafka clusters in Active\Passive replication using MM 2.0.
When I produce messages to a topic and try to read from it in the same cluster, 
the consumer group lag is already set to zero. The replica topic also has the 
same lag and log-end-offset.
My MM2 is using a superuser account and I am using a separate consumer-group 
with permission just to read and write to this topic. 
Just to elaborate further, * Write 10 messages to a topic without any active 
consumers but with MM2 replicating the topic to a passive Cluster.
 * Attempt to read 3 messages from the topic, this creates my consumer group 
and adds an active consumer. The result is no messages are read

 * Describe the consumer group, the result shows Log-End-Offset with correct 
number of messages, but Lag shows 0

 * Attempt to read 3 messages from passive cluster using same consumer-group. 
Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct number 
of messages (i.e. same as active cluster)

 * Keep consumer running
 * Write a few more messages.
 * Consumer is now reading latest messages
 * Stop consumer
 * Keep writing new messages
 * Lag shows correct value.

In the absence of MM2, Kafka operation is as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan opened a new pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest

2021-03-18 Thread GitBox


jolshan opened a new pull request #10351:
URL: https://github.com/apache/kafka/pull/10351


   As a result of https://github.com/apache/kafka/pull/9684, a new field for 
topic names was created. For versions 6+ `DeleteTopicsRequestData.topicNames` 
will return an empty list in KafkaApis. This PR uses a new method to 
efficiently initialize the size of the collection.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bsolomon1124 opened a new pull request #10352: [doc] Properly use language-java to highlight Java code

2021-03-18 Thread GitBox


bsolomon1124 opened a new pull request #10352:
URL: https://github.com/apache/kafka/pull/10352


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10278: KAFKA-10526: leader fsync deferral on write

2021-03-18 Thread GitBox


vamossagar12 commented on a change in pull request #10278:
URL: https://github.com/apache/kafka/pull/10278#discussion_r597054403



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1876,7 +1876,7 @@ private long maybeAppendBatches(
 BatchAccumulator.CompletedBatch batch = iterator.next();
 appendBatch(state, batch, currentTimeMs);
 }
-flushLeaderLog(state, currentTimeMs);
+//flushLeaderLog(state, currentTimeMs);

Review comment:
   Got it. So, essentially what you are saying is within 
onUpdateLeaderHighWatermark I can check if the HWM moved due to an increase in 
the LEO(which can be stored to keep track of the last greatest value flushed) 
or due to other cases. So, we don't need to flush only when HWM was crossed but 
instead we are adding another invariant that the LEO should also have moved.
   
   I think it makes sense to me now. I will start making the changes..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#discussion_r597061628



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import 
org.apache.kafka.clients.admin.internals.AdminApiHandler.DynamicKeyMapping;
+import org.apache.kafka.clients.admin.internals.AdminApiHandler.KeyMappings;
+import 
org.apache.kafka.clients.admin.internals.AdminApiHandler.StaticKeyMapping;
+import 
org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy.RequestScope;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * The `KafkaAdminClient`'s internal `Call` primitive is not a good fit for 
multi-stage
+ * request workflows such as we see with the group coordinator APIs or any 
request which
+ * needs to be sent to a partition leader. Typically these APIs have two 
concrete stages:
+ *
+ * 1. Lookup: Find the broker that can fulfill the request (e.g. partition 
leader or group
+ *coordinator)
+ * 2. Fulfillment: Send the request to the broker found in the first step
+ *
+ * This is complicated by the fact that `Admin` APIs are typically batched, 
which
+ * means the Lookup stage may result in a set of brokers. For example, take a 
`ListOffsets`
+ * request for a set of topic partitions. In the Lookup stage, we will find 
the partition
+ * leaders for this set of partitions; in the Fulfillment stage, we will group 
together
+ * partition according to the IDs of the discovered leaders.
+ *
+ * Additionally, the flow between these two stages is bi-directional. We may 
find after
+ * sending a `ListOffsets` request to an expected leader that there was a 
leader change.
+ * This would result in a topic partition being sent back to the Lookup stage.
+ *
+ * Managing this complexity by chaining together `Call` implementations is 
challenging
+ * and messy, so instead we use this class to do the bookkeeping. It handles 
both the
+ * batching aspect as well as the transitions between the Lookup and 
Fulfillment stages.
+ *
+ * Note that the interpretation of the `retries` configuration becomes 
ambiguous
+ * for this kind of pipeline. We could treat it as an overall limit on the 
number
+ * of requests that can be sent, but that is not very useful because each 
pipeline
+ * has a minimum number of requests that need to be sent in order to satisfy 
the request.
+ * Instead, we treat this number of retries independently at each stage so 
that each
+ * stage has at least one opportunity to complete. So if a user sets 
`retries=1`, then
+ * the full pipeline can still complete as long as there are no request 
failures.
+ *
+ * @param  The key type, which is also the granularity of the request 
routing (e.g.
+ *this could be `TopicPartition` in the case of requests intended 
for a partition
+ *leader or the `GroupId` in the case of consumer group requests 
intended for
+ *the group coordinator)
+ * @param  The fulfillment type for each key (e.g. this could be consumer 
group state
+ *when the key type is a consumer `GroupId`)
+ */
+public class AdminApiDriver {
+private final Logger log;
+private final long retryBackoffMs;
+private final long deadlineMs;
+private final AdminApiHandler handler;
+private final Optional> staticMapping;
+private final Optional> dynamicMapping;
+private final Map> futures;
+
+private final BiMultimap lookupMap = new BiMultimap<>();
+private final BiMultimap fulfillmentMap = new 
B

[GitHub] [kafka] cmccabe commented on a change in pull request #10334: MINOR: Fix BaseHashTable sizing

2021-03-18 Thread GitBox


cmccabe commented on a change in pull request #10334:
URL: https://github.com/apache/kafka/pull/10334#discussion_r597064603



##
File path: metadata/src/main/java/org/apache/kafka/timeline/BaseHashTable.java
##
@@ -56,12 +61,27 @@
 this.elements = new Object[expectedSizeToCapacity(expectedSize)];
 }
 
+/**
+ * Calculate the capacity we should provision, given the expected size.
+ *
+ * Our capacity must always be a power of 2, and never less than 2.
+ */
 static int expectedSizeToCapacity(int expectedSize) {
-if (expectedSize <= 1) {
-return 2;
+if (expectedSize >= MAX_CAPACITY / 2) {
+return MAX_CAPACITY;
+}
+return Math.max(MIN_CAPACITY, roundUpToPowerOfTwo(expectedSize * 2));
+}
+
+private static int roundUpToPowerOfTwo(int i) {
+if (i <= 0) {
+return 0;
+} else if (i > MAX_SIGNED_POWER_OF_TWO) {

Review comment:
   Good catch.  Yes, this should be fixed now in the latest logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#discussion_r597064897



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+public interface AdminApiLookupStrategy {
+
+/**
+ * Define the scope of a given key for lookup. Key lookups are complicated
+ * by the need to accommodate different batching mechanics. For example,
+ * a `Metadata` request supports arbitrary batching of topic partitions in
+ * order to discover partitions leaders. This can be supported by returning
+ * a single scope object for all keys.
+ *
+ * On the other hand, `FindCoordinator` request only supports lookup of a
+ * single key. This can be supported by returning a different scope object
+ * for each lookup key.
+ *
+ * @param key the lookup key
+ *
+ * @return request scope indicating how lookup requests can be batched 
together
+ */
+RequestScope lookupScope(T key);
+
+/**
+ * Build the lookup request for a set of keys. The grouping of the keys is 
controlled
+ * through {@link #lookupScope(Object)}. In other words, each set of keys 
that map
+ * to the same request scope object will be sent to this method.
+ *
+ * @param keys the set of keys that require lookup
+ *
+ * @return a builder for the lookup request
+ */
+AbstractRequest.Builder buildRequest(Set keys);
+
+/**
+ * Callback that is invoked when a lookup request returns successfully. 
The handler
+ * should parse the response, check for errors, and return a result 
indicating
+ * which keys were mapped to a brokerId successfully and which keys 
received
+ * a fatal error (e.g. a topic authorization failure).
+ *
+ * Note that keys which receive a retriable error should be left out of the
+ * result. They will be retried automatically. For example, if the 
response of
+ * `FindCoordinator` request indicates an unavailable coordinator, then 
the key
+ * should be left out of the result so that the request will be retried.
+ *
+ * @param keys the set of keys from the associated request
+ * @param response the response received from the broker
+ *
+ * @return a result indicating which keys mapped successfully to a 
brokerId and
+ * which encountered a fatal error
+ */
+LookupResult handleResponse(Set keys, AbstractResponse response);
+
+class LookupResult {
+public final Map mappedKeys;
+public final Map failedKeys;
+
+public LookupResult(
+Map failedKeys,
+Map mappedKeys
+) {
+this.failedKeys = Collections.unmodifiableMap(failedKeys);
+this.mappedKeys = Collections.unmodifiableMap(mappedKeys);
+}
+}
+
+interface RequestScope {
+default OptionalInt destinationBrokerId() {
+return OptionalInt.empty();
+}
+}

Review comment:
   Let me think about it. I wasn't entirely happy with this location either.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10317: KAFKA-10357: Add setup method to internal topics

2021-03-18 Thread GitBox


guozhangwang merged pull request #10317:
URL: https://github.com/apache/kafka/pull/10317


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304285#comment-17304285
 ] 

hudeqi commented on KAFKA-12478:


  Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.

  Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe merged pull request #10334: MINOR: Fix BaseHashTable sizing

2021-03-18 Thread GitBox


cmccabe merged pull request #10334:
URL: https://github.com/apache/kafka/pull/10334


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Issue Comment Deleted] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-12478:
---
Comment: was deleted

(was: z)

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304286#comment-17304286
 ] 

hudeqi commented on KAFKA-12478:


z

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest

2021-03-18 Thread GitBox


dajac commented on a change in pull request #10351:
URL: https://github.com/apache/kafka/pull/10351#discussion_r597073992



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
##
@@ -101,6 +101,12 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 return data.topics().stream().map(topic -> 
topic.name()).collect(Collectors.toList());
 return data.topicNames(); 
 }
+
+public int numberOfTopics() {

Review comment:
   Should we add a small unit test or extend an existing unit test to 
verify this across all versions?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304285#comment-17304285
 ] 

hudeqi edited comment on KAFKA-12478 at 3/18/21, 5:04 PM:
--

  Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. *Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.*

  Looking forward to your reply!


was (Author: hudeqi):
  Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.

  Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12499) Adjust transaction timeout according to commit interval on Streams EOS

2021-03-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-12499:
---

 Summary: Adjust transaction timeout according to commit interval 
on Streams EOS
 Key: KAFKA-12499
 URL: https://issues.apache.org/jira/browse/KAFKA-12499
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Boyang Chen
Assignee: Boyang Chen
 Fix For: 3.0.0


The transaction timeout is set to 1 minute by default on Producer today, while 
the commit interval on the other hand could be set to a very large value, which 
makes the stream always hit transaction timeout and drop into rebalance. We 
should increase the transaction timeout correspondingly when commit interval is 
large.

On the other hand, broker could have a limit on the max transaction timeout to 
be set. If we scale up client transaction timeout over the limit, stream will 
fail due to  INVALID_TRANSACTION_TIMEOUT. To alleviate this problem, user could 
define their own customized transaction timeout to avoid hitting the limit, so 
we should still respect what user configures in the override.

The new rule for configuring transaction timeout should look like:
1. If transaction timeout is set in streams config, use it

2. if not, transaction_timeout = max(default_transaction_timeout, 10 * 
commit_interval) 

Additionally if INVALID_TRANSACTION_TIMEOUT was thrown on Streams when calling 
initTransaction(), we should wrap the exception to inform user that their 
setting for commit interval could potentially be too high and should adjust.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r597080557



##
File path: KIP-500.md
##
@@ -0,0 +1,157 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits -- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0.  There may 
be bugs, including serious ones.  You should *assume that your data could be 
lost at any time* if you try the early access release of KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/nozk-combined.properties
+Formatting /tmp/nozk-combined-logs
+
+
+If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.
+
+## Start the Kafka Server
+Finally, you are ready to start the Kafka server on each node.
+
+
+$ ./bin/kafka-server-start.sh ./config/nozk-combined.properties
+[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
+[2021-02-26 15:37:11,294] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
+[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, 
dir=/tmp/nozk-combined-logs] Loading producer state till offset 0 with message 
format version 2 (kafka.log.Log)
+[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
+[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to 
Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) 
(org.apache.kafka.raft.QuorumState)
+...
+
+
+Just like with a ZooKeeper based broker, you can connect to port 9092 (or 
whatever port you configured) to perform administrative operations or produce 
or consume data.
+
+
+$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 
--replication-factor 1 --bootstrap-server localhost:9092
+Created topic foo.
+
+
+# Deployment
+
+## Controller Servers
+Unlike in ZooKeeper-based mode, where any server can become the controller, in 
self-managed mode, only a small group of specially selected servers can act as 
controllers.  The specially selected controller servers will participate in the 
metadata quorum.  Each KIP-500 controller server is either active, or a hot 
standby for the current active controller server.
+
+Typically you will select either 3 or 5 servers for this role, depending on 
the size of your cluster.  Just like with ZooKeeper, you must keep a majority 
of the controllers alive in order to maintain availability.  So if you have 3 
controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 
failures.
+
+## Process Roles
+Each Kafka server now has a new configuration key called `process.roles` which 
can have the following values:
+
+* If `process.roles` is set to `broker`, the server acts as a self-managed 
broker.
+* If `process.roles` is set to `controller`, the server acts as a self-managed 
controller.
+* If `process.roles` is set to `broker,controller`, the server acts as both a 
self-managed broker and a self-managd controller.
+* If `process.roles` is not set at all then we are assumed to be in ZooKeeper 
mode.  As mentioned earlie

[GitHub] [kafka] cmccabe commented on a change in pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-18 Thread GitBox


cmccabe commented on a change in pull request #10227:
URL: https://github.com/apache/kafka/pull/10227#discussion_r597083837



##
File path: KIP-500.md
##
@@ -0,0 +1,157 @@
+KIP-500 Early Access Release
+
+
+# Introduction
+It is now possible to run Apache Kafka without Apache ZooKeeper!  We call this 
mode [self-managed 
mode](https://cwiki.apache.org/confluence/display/KAFKA/KIP-500%3A+Replace+ZooKeeper+with+a+Self-Managed+Metadata+Quorum).
  It is currently *EARLY ACCESS AND SHOULD NOT BE USED IN PRODUCTION*, but it 
is available for testing in the Kafka 2.8 release.
+
+When the Kafka cluster is in self-managed mode, it does not store its metadata 
in ZooKeeper.  In fact, you do not have to run ZooKeeper at all, because it 
stores its metadata in a Raft quorum of controller nodes.
+
+Self-managed mode has many benefits -- some obvious, and some not so obvious.  
Clearly, it is nice to manage and configure one service rather than two 
services.  In addition, you can now run a single process Kafka cluster.  Most 
important of all, self-managed mode is more scalable.  We expect to be able to 
[support many more topics and 
partitions](https://www.confluent.io/kafka-summit-san-francisco-2019/kafka-needs-no-keeper/)
 in this mode.
+
+# Quickstart
+
+## Warning
+Self-managed mode in Kafka 2.8 is provided for testing only, *NOT* for 
production.  We do not yet support upgrading existing ZooKeeper-based Kafka 
clusters into this mode.  In fact, when Kafka 3.0 is released, it may not even 
be possible to upgrade your self-managed clusters from 2.8 to 3.0.  There may 
be bugs, including serious ones.  You should *assume that your data could be 
lost at any time* if you try the early access release of KIP-500.
+
+## Generate a cluster ID
+The first step is to generate an ID for your new cluster, using the 
kafka-storage tool:
+
+
+$ ./bin/kafka-storage.sh random-uuid
+xtzWWN4bTjitpL3kfd9s5g
+
+
+## Format Storage Directories
+The next step is to format your storage directories.  If you are running in 
single-node mode, you can do this with one command:
+
+
+$ ./bin/kafka-storage.sh format -t xtzWWN4bTjitpL3kfd9s5g -c 
./config/nozk-combined.properties
+Formatting /tmp/nozk-combined-logs
+
+
+If you are using multiple nodes, then you should run the format command on 
each node.  Be sure to use the same cluster ID for each one.
+
+## Start the Kafka Server
+Finally, you are ready to start the Kafka server on each node.
+
+
+$ ./bin/kafka-server-start.sh ./config/nozk-combined.properties
+[2021-02-26 15:37:11,071] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
+[2021-02-26 15:37:11,294] INFO Setting -D 
jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS 
renegotiation (org.apache.zookeeper.common.X509Util)
+[2021-02-26 15:37:11,466] INFO [Log partition=@metadata-0, 
dir=/tmp/nozk-combined-logs] Loading producer state till offset 0 with message 
format version 2 (kafka.log.Log)
+[2021-02-26 15:37:11,509] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
+[2021-02-26 15:37:11,640] INFO [RaftManager nodeId=1] Completed transition to 
Unattached(epoch=0, voters=[1], electionTimeoutMs=9037) 
(org.apache.kafka.raft.QuorumState)
+...
+
+
+Just like with a ZooKeeper based broker, you can connect to port 9092 (or 
whatever port you configured) to perform administrative operations or produce 
or consume data.
+
+
+$ ./bin/kafka-topics.sh --create --topic foo --partitions 1 
--replication-factor 1 --bootstrap-server localhost:9092
+Created topic foo.
+
+
+# Deployment
+
+## Controller Servers
+Unlike in ZooKeeper-based mode, where any server can become the controller, in 
self-managed mode, only a small group of specially selected servers can act as 
controllers.  The specially selected controller servers will participate in the 
metadata quorum.  Each KIP-500 controller server is either active, or a hot 
standby for the current active controller server.
+
+Typically you will select either 3 or 5 servers for this role, depending on 
the size of your cluster.  Just like with ZooKeeper, you must keep a majority 
of the controllers alive in order to maintain availability.  So if you have 3 
controllers, you can tolerate 1 failure; with 5 controllers, you can tolerate 2 
failures.
+
+## Process Roles
+Each Kafka server now has a new configuration key called `process.roles` which 
can have the following values:
+
+* If `process.roles` is set to `broker`, the server acts as a self-managed 
broker.
+* If `process.roles` is set to `controller`, the server acts as a self-managed 
controller.
+* If `process.roles` is set to `broker,controller`, the server acts as both a 
self-managed broker and a self-managd controller.
+* If `process.roles` is not set at all then we are assumed to be in ZooKeeper 
mode.  As mentioned earlier,

[jira] [Commented] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304295#comment-17304295
 ] 

Guozhang Wang commented on KAFKA-12472:
---

I just added a few more states for Streams, regarding various TaskMigrated 
error cases.

> Add a Consumer / Streams metric to indicate the current rebalance status
> 
>
> Key: KAFKA-12472
> URL: https://issues.apache.org/jira/browse/KAFKA-12472
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today to trouble shoot a rebalance issue operators need to do a lot of manual 
> steps: locating the problematic members, search in the log entries, and look 
> for related metrics. It would be great to add a single metric that covers all 
> these manual steps and operators would only need to check this single signal 
> to check what is the root cause. A concrete idea is to expose two enum gauge 
> metrics on consumer and streams, respectively:
> * Consumer level (the order below is by-design, see Streams level for 
> details):
>   0. *None* => there is no rebalance on going.
>   1. *CoordinatorRequested* => any of the coordinator response contains a 
> RebalanceInProgress error code.
>   2. *NewMember* => when the join group response has a MemberIdRequired error 
> code.
>   3. *UnknownMember* => when any of the coordinator response contains an 
> UnknownMember error code, indicating this member is already kicked out of the 
> group.
>   4. *StaleMember* => when any of the coordinator response contains an 
> IllegalGeneration error code.
>   5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
> expired.
>   6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll 
> API, as well as upon calling the enforceRebalance API.
>   7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
>   8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
> changed.
>   9. *RetryOnError* => when join/syncGroup response contains a retriable 
> error which would cause the consumer to backoff and retry.
>  10. *RevocationNeeded* => requestRejoin triggered since revoked partitions 
> is not empty.
> The transition rule is that a non-zero status code can only transit to zero 
> or to a higher code, but not to a lower code (same for streams, see 
> rationales below).
> * Streams level: today a streams client can have multiple consumers. We 
> introduced some new enum states as well as aggregation rules across 
> consumers: if there's no streams-layer events as below that transits its 
> status (i.e. streams layer think it is 0), then we aggregate across all the 
> embedded consumers and take the largest status code value as the streams 
> metric; if there are streams-layer events that determines its status should 
> be in 10+, then it ignores all embedded consumer layer status code since it 
> should has a higher precedence. In addition, when create aggregated metric 
> across streams instance (a.k.a at the app-level, which is usually what we 
> would care and alert on), we also follow the same aggregation rule, e.g. if 
> there are two streams instance where one instance's status code is 1), and 
> the other is 10), then the app's status is 10).
>  10. *RevocationNeeded* => the definition of this is changed to the original 
> 10) defined in consumer above, OR leader decides to revoke either 
> active/standby tasks and hence schedule follow-ups.
>  11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
> current assignment is unstable.
>  12. *VersionProbing* => leader decides to schedule follow-ups due to version 
> probing.
>  13. *EndpointUpdate* => anyone decides to schedule follow-ups due to 
> endpoint updates.
> The main motivations of the above proposed precedence order are the following:
> 1. When a rebalance is triggered by one member, all other members would only 
> know it is due to CoordinatorRequested from coordinator error codes, and 
> hence CoordinatorRequested should be overridden by any other status when 
> aggregating across clients.
> 2. DroppedGroup could cause unknown/stale members that would fail and retry 
> immediately, and hence should take higher precedence.
> 3. Revocation definition is extended in Streams, and hence it needs to take 
> the highest precedence among all consumer-only status so that it would not be 
> overridden by any of the consumer-only status.
> 4. In general, more rare events get higher precedence.
> This is proposed on top of KAFKA-12352. Any comments on the precedence rules 
> / categorization are more than welcomed!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest

2021-03-18 Thread GitBox


jolshan commented on a change in pull request #10351:
URL: https://github.com/apache/kafka/pull/10351#discussion_r597088531



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
##
@@ -101,6 +101,12 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable e) {
 return data.topics().stream().map(topic -> 
topic.name()).collect(Collectors.toList());
 return data.topicNames(); 
 }
+
+public int numberOfTopics() {

Review comment:
   Can do!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #10227: KAFKA-12382: add a README for KIP-500

2021-03-18 Thread GitBox


cmccabe merged pull request #10227:
URL: https://github.com/apache/kafka/pull/10227


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then it ignores all embedded consumer layer status code since it should has a 
higher precedence. In addition, when create aggregated metric across streams 
instance (a.k.a at the app-level, which is usually what we would care and alert 
on), we also follow the same aggregation rule, e.g. if there are two streams 
instance where one instance's status code is 1), and the other is 10), then the 
app's status is 10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.
 14. *EOSViolated* => when OutOfOrderSequenceException is thrown, causing 
TaskMigratedException
 15. *EOSProducerFenced* => when ProducerFencedException / 
InvalidProducerEpochException / UnknownProducerIdException are thrown, causing 
TaskMigratedException
 15. *ConsumerDropped* => when CommitFailedException are thrown, causing 
TaskMigratedException

The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

This is proposed on top of KAFKA-12352. Any comments on the precedence rules / 
categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, r

[jira] [Updated] (KAFKA-12472) Add a Consumer / Streams metric to indicate the current rebalance status

2021-03-18 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12472:
--
Description: 
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, respectively:

* Consumer level (the order below is by-design, see Streams level for details):
  0. *None* => there is no rebalance on going.
  1. *CoordinatorRequested* => any of the coordinator response contains a 
RebalanceInProgress error code.
  2. *NewMember* => when the join group response has a MemberIdRequired error 
code.
  3. *UnknownMember* => when any of the coordinator response contains an 
UnknownMember error code, indicating this member is already kicked out of the 
group.
  4. *StaleMember* => when any of the coordinator response contains an 
IllegalGeneration error code.
  5. *DroppedGroup* => when hb thread decides to call leaveGroup due to hb 
expired.
  6. *UserRequested* => when leaveGroup upon the shutdown / unsubscribeAll API, 
as well as upon calling the enforceRebalance API.
  7. *MetadataChanged* => requestRejoin triggered since metadata has changed.
  8. *SubscriptionChanged* => requestRejoin triggered since subscription has 
changed.
  9. *RetryOnError* => when join/syncGroup response contains a retriable error 
which would cause the consumer to backoff and retry.
 10. *RevocationNeeded* => requestRejoin triggered since revoked partitions is 
not empty.

The transition rule is that a non-zero status code can only transit to zero or 
to a higher code, but not to a lower code (same for streams, see rationales 
below).

* Streams level: today a streams client can have multiple consumers. We 
introduced some new enum states as well as aggregation rules across consumers: 
if there's no streams-layer events as below that transits its status (i.e. 
streams layer think it is 0), then we aggregate across all the embedded 
consumers and take the largest status code value as the streams metric; if 
there are streams-layer events that determines its status should be in 10+, 
then it ignores all embedded consumer layer status code since it should has a 
higher precedence. In addition, when create aggregated metric across streams 
instance (a.k.a at the app-level, which is usually what we would care and alert 
on), we also follow the same aggregation rule, e.g. if there are two streams 
instance where one instance's status code is 1), and the other is 10), then the 
app's status is 10).
 10. *RevocationNeeded* => the definition of this is changed to the original 
10) defined in consumer above, OR leader decides to revoke either 
active/standby tasks and hence schedule follow-ups.
 11. *AssignmentProbing* => leader decides to schedule follow-ups since the 
current assignment is unstable.
 12. *VersionProbing* => leader decides to schedule follow-ups due to version 
probing.
 13. *EndpointUpdate* => anyone decides to schedule follow-ups due to endpoint 
updates.
 14. *EOSViolated* => when OutOfOrderSequenceException is thrown, causing 
TaskMigratedException
 15. *EOSProducerFenced* => when ProducerFencedException / 
InvalidProducerEpochException / UnknownProducerIdException are thrown, causing 
TaskMigratedException
 16. *ConsumerDropped* => when CommitFailedException are thrown, causing 
TaskMigratedException

The main motivations of the above proposed precedence order are the following:
1. When a rebalance is triggered by one member, all other members would only 
know it is due to CoordinatorRequested from coordinator error codes, and hence 
CoordinatorRequested should be overridden by any other status when aggregating 
across clients.
2. DroppedGroup could cause unknown/stale members that would fail and retry 
immediately, and hence should take higher precedence.
3. Revocation definition is extended in Streams, and hence it needs to take the 
highest precedence among all consumer-only status so that it would not be 
overridden by any of the consumer-only status.
4. In general, more rare events get higher precedence.

This is proposed on top of KAFKA-12352. Any comments on the precedence rules / 
categorization are more than welcomed!

  was:
Today to trouble shoot a rebalance issue operators need to do a lot of manual 
steps: locating the problematic members, search in the log entries, and look 
for related metrics. It would be great to add a single metric that covers all 
these manual steps and operators would only need to check this single signal to 
check what is the root cause. A concrete idea is to expose two enum gauge 
metrics on consumer and streams, r

[jira] [Resolved] (KAFKA-12376) Use scheduleAtomicAppend for records that need to be atomic

2021-03-18 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12376.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

> Use scheduleAtomicAppend for records that need to be atomic
> ---
>
> Key: KAFKA-12376
> URL: https://issues.apache.org/jira/browse/KAFKA-12376
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12383) Get RaftClusterTest.java and other KIP-500 junit tests working

2021-03-18 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-12383:
-
Affects Version/s: (was: 2.8.0)
   3.0.0
   2.9

> Get RaftClusterTest.java and other KIP-500 junit tests working
> --
>
> Key: KAFKA-12383
> URL: https://issues.apache.org/jira/browse/KAFKA-12383
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0, 2.9
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Blocker
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12382) Create KIP-500 README for the 2.8 release

2021-03-18 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-12382.
--
Fix Version/s: 2.8.0
   Resolution: Fixed

> Create KIP-500 README for the 2.8 release
> -
>
> Key: KAFKA-12382
> URL: https://issues.apache.org/jira/browse/KAFKA-12382
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10340:
URL: https://github.com/apache/kafka/pull/10340#discussion_r597106362



##
File path: 
core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
##
@@ -319,12 +325,40 @@ class BrokerToControllerRequestThreadTest {
 
 val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
   config, time, "", retryTimeoutMs = Long.MaxValue)
+testRequestThread.started = true
 
 testRequestThread.enqueue(queueItem)
 pollUntil(testRequestThread, () => callbackResponse.get != null)
 assertNotNull(callbackResponse.get.authenticationException)
   }
 
+  @Test
+  def testThreadNotStarted(): Unit = {
+// Make sure we throw if we enqueue anything while the thread is not 
running
+val time = new MockTime()
+val config = new KafkaConfig(TestUtils.createBrokerConfig(1, 
"localhost:2181"))
+
+val metadata = mock(classOf[Metadata])
+val mockClient = new MockClient(time, metadata)
+
+val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+
+val expectedResponse = RequestTestUtils.metadataUpdateWith(2, 
Collections.singletonMap("a", 2))

Review comment:
   I don't think we need this either since the request never gets sent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on pull request #10202: KAFKA-12392: Deprecate the batch-size option in console producer

2021-03-18 Thread GitBox


kamalcph commented on pull request #10202:
URL: https://github.com/apache/kafka/pull/10202#issuecomment-802157373


   Can we deprecate this config in 2.8 if allowed and remove it in the 3.0 
release? WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12498) Consumer Group Lag is zero when MirrorMaker 2.0 is running

2021-03-18 Thread Olumide Ajiboye (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Olumide Ajiboye updated KAFKA-12498:

Summary: Consumer Group Lag is zero when MirrorMaker 2.0 is running  (was: 
Consumer Group is zero when MirrorMaker 2.0 is running)

> Consumer Group Lag is zero when MirrorMaker 2.0 is running
> --
>
> Key: KAFKA-12498
> URL: https://issues.apache.org/jira/browse/KAFKA-12498
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, replication
>Affects Versions: 2.7.0
> Environment: Kubernetes 1.19.7
> Strimzi 0.21.1
>Reporter: Olumide Ajiboye
>Priority: Major
>
> I have two Kafka clusters in Active\Passive replication using MM 2.0.
> When I produce messages to a topic and try to read from it in the same 
> cluster, the consumer group lag is already set to zero. The replica topic 
> also has the same lag and log-end-offset.
> My MM2 is using a superuser account and I am using a separate consumer-group 
> with permission just to read and write to this topic. 
> Just to elaborate further, * Write 10 messages to a topic without any active 
> consumers but with MM2 replicating the topic to a passive Cluster.
>  * Attempt to read 3 messages from the topic, this creates my consumer group 
> and adds an active consumer. The result is no messages are read
>  * Describe the consumer group, the result shows Log-End-Offset with correct 
> number of messages, but Lag shows 0
>  * Attempt to read 3 messages from passive cluster using same consumer-group. 
> Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct 
> number of messages (i.e. same as active cluster)
>  * Keep consumer running
>  * Write a few more messages.
>  * Consumer is now reading latest messages
>  * Stop consumer
>  * Keep writing new messages
>  * Lag shows correct value.
> In the absence of MM2, Kafka operation is as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12498) Consumer Group Lag is zero when MirrorMaker 2.0 is running

2021-03-18 Thread Olumide Ajiboye (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Olumide Ajiboye updated KAFKA-12498:

Description: 
I have two Kafka clusters in Active\Passive replication using MM 2.0.
 When I produce messages to a topic and try to read from it in the same 
cluster, the consumer group lag is already set to zero. The replica topic also 
has the same lag and log-end-offset.
MM2 is using a superuser client and I am using a separate consumer-group with 
separate credentials and with permission just to read and write to this topic. 
Steps to reproduce:
 * Setup cluster and DR cluster with MM2 in uni-directional replication.
 * Create a topic.
 * Write 10 messages to the topic without any active consumers but with MM2 
replicating the topic to a passive Cluster.
 * Attempt to read 3 messages from the topic, this creates my consumer group 
and adds an active consumer. The result is no messages are read

 * Describe the consumer group, the result shows Log-End-Offset with correct 
number of messages, but Lag shows 0

 * Attempt to read 3 messages from passive cluster using same consumer-group. 
Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct number 
of messages (i.e. same as active cluster)

 * Keep consumer running
 * Write a few more messages.
 * Consumer is now reading latest messages
 * Stop consumer
 * Keep writing new messages
 * Lag shows correct value.

In the absence of MM2, Kafka operation is as expected.

  was:
I have two Kafka clusters in Active\Passive replication using MM 2.0.
When I produce messages to a topic and try to read from it in the same cluster, 
the consumer group lag is already set to zero. The replica topic also has the 
same lag and log-end-offset.
My MM2 is using a superuser account and I am using a separate consumer-group 
with permission just to read and write to this topic. 
Just to elaborate further, * Write 10 messages to a topic without any active 
consumers but with MM2 replicating the topic to a passive Cluster.
 * Attempt to read 3 messages from the topic, this creates my consumer group 
and adds an active consumer. The result is no messages are read

 * Describe the consumer group, the result shows Log-End-Offset with correct 
number of messages, but Lag shows 0

 * Attempt to read 3 messages from passive cluster using same consumer-group. 
Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct number 
of messages (i.e. same as active cluster)

 * Keep consumer running
 * Write a few more messages.
 * Consumer is now reading latest messages
 * Stop consumer
 * Keep writing new messages
 * Lag shows correct value.

In the absence of MM2, Kafka operation is as expected.


> Consumer Group Lag is zero when MirrorMaker 2.0 is running
> --
>
> Key: KAFKA-12498
> URL: https://issues.apache.org/jira/browse/KAFKA-12498
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, replication
>Affects Versions: 2.7.0
> Environment: Kubernetes 1.19.7
> Strimzi 0.21.1
>Reporter: Olumide Ajiboye
>Priority: Major
>
> I have two Kafka clusters in Active\Passive replication using MM 2.0.
>  When I produce messages to a topic and try to read from it in the same 
> cluster, the consumer group lag is already set to zero. The replica topic 
> also has the same lag and log-end-offset.
> MM2 is using a superuser client and I am using a separate consumer-group with 
> separate credentials and with permission just to read and write to this 
> topic. 
> Steps to reproduce:
>  * Setup cluster and DR cluster with MM2 in uni-directional replication.
>  * Create a topic.
>  * Write 10 messages to the topic without any active consumers but with MM2 
> replicating the topic to a passive Cluster.
>  * Attempt to read 3 messages from the topic, this creates my consumer group 
> and adds an active consumer. The result is no messages are read
>  * Describe the consumer group, the result shows Log-End-Offset with correct 
> number of messages, but Lag shows 0
>  * Attempt to read 3 messages from passive cluster using same consumer-group. 
> Result: no messages consumed, Lag shows 0, Log-End-Offset shows correct 
> number of messages (i.e. same as active cluster)
>  * Keep consumer running
>  * Write a few more messages.
>  * Consumer is now reading latest messages
>  * Stop consumer
>  * Keep writing new messages
>  * Lag shows correct value.
> In the absence of MM2, Kafka operation is as expected.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah merged pull request #10340: MINOR: Start the broker-to-controller channel for request forwarding

2021-03-18 Thread GitBox


mumrah merged pull request #10340:
URL: https://github.com/apache/kafka/pull/10340


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #10344: MINOR: Remove use of NoSuchElementException

2021-03-18 Thread GitBox


hachikuji merged pull request #10344:
URL: https://github.com/apache/kafka/pull/10344


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304330#comment-17304330
 ] 

Randall Hauch commented on KAFKA-3813:
--

[KIP-131|https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector]
 was implemented with KAFKA-4794.

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reopened KAFKA-3813:
--

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-3813.
--
Resolution: Duplicate

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah opened a new pull request #10353: DO NOT MERGE: Add a failing unit test to check Gradle and Github behavior

2021-03-18 Thread GitBox


mumrah opened a new pull request #10353:
URL: https://github.com/apache/kafka/pull/10353


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597144327



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   @hachikuji can you explain what is meant by using snapshotId? You mean 
changing `assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())` to`assertEquals(new 
OffsetAndEpoch(snapshotId.offset, snapshotId.epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   ` and not use variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


jolshan opened a new pull request #10354:
URL: https://github.com/apache/kafka/pull/10354


   Builds are failing since this file does not have a license. Similar .md 
files do not seem to have licenses, so I've added this file to the exclude list.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10349: MINOR: Move `configurations.all` to be a child of `allprojects`

2021-03-18 Thread GitBox


ijuma merged pull request #10349:
URL: https://github.com/apache/kafka/pull/10349


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


hachikuji commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802206799


   Thanks for fixing this so quickly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >