[GitHub] [kafka] showuon commented on pull request #13100: MINOR: add size check for tagged fields

2023-01-09 Thread GitBox


showuon commented on PR #13100:
URL: https://github.com/apache/kafka/pull/13100#issuecomment-1376856241

   @ijuma @mimaison , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13100: MINOR: add size check for tagged fields

2023-01-09 Thread GitBox


showuon opened a new pull request, #13100:
URL: https://github.com/apache/kafka/pull/13100

   Add size check for taggedFields of a tag, and add tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14279) Add 3.3.1 to broker/client and stream upgrade/compatibility tests

2023-01-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14279.
-
Resolution: Fixed

> Add 3.3.1 to broker/client and stream upgrade/compatibility tests
> -
>
> Key: KAFKA-14279
> URL: https://issues.apache.org/jira/browse/KAFKA-14279
> Project: Kafka
>  Issue Type: Task
>  Components: clients, core, streams, system tests
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Blocker
> Fix For: 3.4.0
>
>
> Per the penultimate bullet on the [release 
> checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
>  Kafka v3.3.0 is released. We should add this version to the system tests.
> Example PRs:
>  * Broker and clients: [https://github.com/apache/kafka/pull/6794]
>  * Streams: [https://github.com/apache/kafka/pull/6597/files]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14279) Add 3.3.1 to broker/client and stream upgrade/compatibility tests

2023-01-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14279:

Fix Version/s: 3.4.0
   (was: 3.5.0)

> Add 3.3.1 to broker/client and stream upgrade/compatibility tests
> -
>
> Key: KAFKA-14279
> URL: https://issues.apache.org/jira/browse/KAFKA-14279
> Project: Kafka
>  Issue Type: Task
>  Components: clients, core, streams, system tests
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Blocker
> Fix For: 3.4.0
>
>
> Per the penultimate bullet on the [release 
> checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
>  Kafka v3.3.0 is released. We should add this version to the system tests.
> Example PRs:
>  * Broker and clients: [https://github.com/apache/kafka/pull/6794]
>  * Streams: [https://github.com/apache/kafka/pull/6597/files]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13077: KAFKA-14279; Add 3.3.x streams system tests

2023-01-09 Thread GitBox


mjsax commented on PR #13077:
URL: https://github.com/apache/kafka/pull/13077#issuecomment-1376846757

   Thanks for the PR. Merged to `trunk` and cherry-picked to `3.4` branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13077: KAFKA-14279; Add 3.3.x streams system tests

2023-01-09 Thread GitBox


mjsax merged PR #13077:
URL: https://github.com/apache/kafka/pull/13077


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14126) Convert remaining DynamicBrokerReconfigurationTest tests to KRaft

2023-01-09 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656475#comment-17656475
 ] 

Matthias J. Sax commented on KAFKA-14126:
-

Saw this failing 2x on this PR: https://github.com/apache/kafka/pull/13077

> Convert remaining DynamicBrokerReconfigurationTest tests to KRaft
> -
>
> Key: KAFKA-14126
> URL: https://issues.apache.org/jira/browse/KAFKA-14126
> Project: Kafka
>  Issue Type: Test
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Major
>
> After the initial conversion in https://github.com/apache/kafka/pull/12455, 
> three tests still need to be converted. 
> * testKeyStoreAlter
> * testTrustStoreAlter
> * testThreadPoolResize



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kamalcph commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes

2023-01-09 Thread GitBox


kamalcph commented on PR #13060:
URL: https://github.com/apache/kafka/pull/13060#issuecomment-1376779303

   @showuon @mimaison
   ping for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on pull request #13059: MINOR: KafkaConfig should not expose internal config when queried for non-internal values

2023-01-09 Thread GitBox


kamalcph commented on PR #13059:
URL: https://github.com/apache/kafka/pull/13059#issuecomment-1376779075

   @showuon @mimaison 
   ping for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation

2023-01-09 Thread GitBox


showuon commented on PR #13099:
URL: https://github.com/apache/kafka/pull/13099#issuecomment-1376733175

   @rondagostino , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation

2023-01-09 Thread GitBox


showuon commented on code in PR #13099:
URL: https://github.com/apache/kafka/pull/13099#discussion_r1065331903


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1496,4 +1496,32 @@ public static String replaceSuffix(String str, String 
oldSuffix, String newSuffi
 throw new IllegalArgumentException("Expected string to end with " 
+ oldSuffix + " but string is " + str);
 return str.substring(0, str.length() - oldSuffix.length()) + newSuffix;
 }
+
+public static long zeroIfNegative(long value) {
+return Math.max(0L, value);
+}
+
+// returns the sum of a and b unless it would overflow, which will return 
Long.MAX_VALUE
+public static long saturatedAdd(long a, long b) {

Review Comment:
   The method name is copied from guava's [LongMath 
class](https://guava.dev/releases/20.0/api/docs/com/google/common/math/LongMath.html#saturatedAdd-long-long-).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation

2023-01-09 Thread GitBox


showuon opened a new pull request, #13099:
URL: https://github.com/apache/kafka/pull/13099

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14604) SASL session expiration time will be overflowed when calculation

2023-01-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14604:
--
Description: 
When sasl server of client set a large expiration time, the timeout value might 
be overflowed, and cause the session timeout immediately.

 

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
 the sasl server timeout's calculation

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
 the sasl client timeout's calculation

 

something like this:
{code:java}
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
sessionLifetimeMs; {code}
So, if the configured or returned sessionLifetimeMs is a large number, after 
the calculation, the `sessionExpirationTimeNanos` will be a negative value, and 
cause the session timeout on each check.

 

 

 

  was:
When sasl server of client set a large expiration time, the timeout value might 
be overflowed, and cause the session timeout immediately.

 

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
 the sasl server timeout's calculation

[Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
 the sasl client timeout's calculation

 

something like this:
{code:java}
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
sessionLifetimeMs; {code}
So, if the configured or returned sessionLifetimeMs is a large number, after 
the calculation, the `sessionExpirationTimeNanos` will be a negative value, and 
cause the session timeout each check.

 

 


> SASL session expiration time will be overflowed when calculation
> 
>
> Key: KAFKA-14604
> URL: https://issues.apache.org/jira/browse/KAFKA-14604
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> When sasl server of client set a large expiration time, the timeout value 
> might be overflowed, and cause the session timeout immediately.
>  
> [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s
>  the sasl server timeout's calculation
> [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s
>  the sasl client timeout's calculation
>  
> something like this:
> {code:java}
> sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * 
> sessionLifetimeMs; {code}
> So, if the configured or returned sessionLifetimeMs is a large number, after 
> the calculation, the `sessionExpirationTimeNanos` will be a negative value, 
> and cause the session timeout on each check.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode

2023-01-09 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14608:
-
Description: 
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we get to be under min ISR if either 1) we were already under min ISR in 
the first place or 2) the replication factor decreases. So in practice I don't 
think this is a severe issue. Either we were already under min ISR so the 
reassignment does not actually *cause* under min ISR. Or we're decreasing the 
replication factor which isn't a common scenario.

It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.

  was:
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we are under min ISR if either 1) we were already under min ISR in the 
first place or 2) the replication factor decreases. So in practice I dont think 
this is a severe issue. Either we were already under min ISR so the 
reassignment does not *cause* under min ISR. Or we're decreasing the 
replication factor which isnt a common scenario.


It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.


> Look into when reassignment should be completed in KRaft mode
> -
>
> Key: KAFKA-14608
> URL: https://issues.apache.org/jira/browse/KAFKA-14608
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In KRaft mode we complete reassignments when the adding replicas have been 
> added to the ISR - see 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
>  As a result its possible for the partition to be under min ISR if the number 
> of adding replicas is less than the topic's under min ISR config and some 
> other target replicas are not in the ISR for whatever reason.
> This behavior differs to ZK mode where we require all target replicas to be 
> in the ISR for the reassignment to complete - see 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].
>  
> I thought more about it and I only think the reassignment can be completed 
> *and* we get to be under min ISR if either 1) we were already under min ISR 
> in the first place or 2) the replication factor decreases. So in practice I 
> don't think this is a severe issue. Either we were already under min ISR so 
> the reassignment does not actually *cause* under min ISR. Or we're decreasing 
> the replication factor which isn't a common scenario.
> It seems there are two options. One is to match the ZK behavior and only 
> complete reassignments when all target replicas are in the ISR. The second is 
> to com

[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode

2023-01-09 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14608:
-
Description: 
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we get to be under min ISR if either 1) we were already under min ISR in 
the first place or 2) the replication factor decreases. So in practice I don't 
think this is a severe issue. Either we were already under min ISR so the 
reassignment does not actually *cause* under min ISR. Or we're decreasing the 
replication factor which isn't a common scenario.

It seems there are three options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR. The third is to leave the current behavior.

  was:
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we get to be under min ISR if either 1) we were already under min ISR in 
the first place or 2) the replication factor decreases. So in practice I don't 
think this is a severe issue. Either we were already under min ISR so the 
reassignment does not actually *cause* under min ISR. Or we're decreasing the 
replication factor which isn't a common scenario.

It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.


> Look into when reassignment should be completed in KRaft mode
> -
>
> Key: KAFKA-14608
> URL: https://issues.apache.org/jira/browse/KAFKA-14608
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In KRaft mode we complete reassignments when the adding replicas have been 
> added to the ISR - see 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
>  As a result its possible for the partition to be under min ISR if the number 
> of adding replicas is less than the topic's under min ISR config and some 
> other target replicas are not in the ISR for whatever reason.
> This behavior differs to ZK mode where we require all target replicas to be 
> in the ISR for the reassignment to complete - see 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].
>  
> I thought more about it and I only think the reassignment can be completed 
> *and* we get to be under min ISR if either 1) we were already under min ISR 
> in the first place or 2) the replication factor decreases. So in practice I 
> don't think this is a severe issue. Either we were already under min ISR so 
> the reassignment does not actually *cause* under min ISR. Or we're decreasing 
> the replication factor which isn't a common scenario.
> It seems there are three options. One is to match the ZK behavior and only 
> complete reassignments 

[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode

2023-01-09 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14608:
-
Description: 
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we are under min ISR if either 1) we were already under min ISR in the 
first place or 2) the replication factor decreases. So in practice I dont think 
this is a severe issue. Either we were already under min ISR so the 
reassignment does not *cause* under min ISR. Or we're decreasing the 
replication factor which isnt a common scenario.


It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.

  was:
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we are under min ISR if either 1) we were already under min ISR in the 
first place or 2) the replication factor decreases. So in practice I dont think 
this is a severe issue. 
It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.


> Look into when reassignment should be completed in KRaft mode
> -
>
> Key: KAFKA-14608
> URL: https://issues.apache.org/jira/browse/KAFKA-14608
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In KRaft mode we complete reassignments when the adding replicas have been 
> added to the ISR - see 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
>  As a result its possible for the partition to be under min ISR if the number 
> of adding replicas is less than the topic's under min ISR config and some 
> other target replicas are not in the ISR for whatever reason.
> This behavior differs to ZK mode where we require all target replicas to be 
> in the ISR for the reassignment to complete - see 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].
>  
> I thought more about it and I only think the reassignment can be completed 
> *and* we are under min ISR if either 1) we were already under min ISR in the 
> first place or 2) the replication factor decreases. So in practice I dont 
> think this is a severe issue. Either we were already under min ISR so the 
> reassignment does not *cause* under min ISR. Or we're decreasing the 
> replication factor which isnt a common scenario.
> It seems there are two options. One is to match the ZK behavior and only 
> complete reassignments when all target replicas are in the ISR. The second is 
> to complete reassignments when enough target replicas are in the ISR such 
> that we're above under min ISR. So if the under min ISR config for a topic is 
> two, then we would complete reassignments when

[jira] [Updated] (KAFKA-14608) Make sure reassignment does not cause under min ISR

2023-01-09 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14608:
-
Description: 
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

 
I thought more about it and I only think the reassignment can be completed 
*and* we are under min ISR if either 1) we were already under min ISR in the 
first place or 2) the replication factor decreases. So in practice I dont think 
this is a severe issue. 
It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.

  was:
In KRaft mode we complete reassignments when the adding replicas have been 
added to the ISR - see 
[https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
 As a result its possible for the partition to be under min ISR if the number 
of adding replicas is less than the topic's under min ISR config and some other 
target replicas are not in the ISR for whatever reason.

This behavior differs to ZK mode where we require all target replicas to be in 
the ISR for the reassignment to complete - see 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].

It seems there are two options. One is to match the ZK behavior and only 
complete reassignments when all target replicas are in the ISR. The second is 
to complete reassignments when enough target replicas are in the ISR such that 
we're above under min ISR. So if the under min ISR config for a topic is two, 
then we would complete reassignments when at least two target replicas are in 
the ISR.


> Make sure reassignment does not cause under min ISR
> ---
>
> Key: KAFKA-14608
> URL: https://issues.apache.org/jira/browse/KAFKA-14608
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In KRaft mode we complete reassignments when the adding replicas have been 
> added to the ISR - see 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
>  As a result its possible for the partition to be under min ISR if the number 
> of adding replicas is less than the topic's under min ISR config and some 
> other target replicas are not in the ISR for whatever reason.
> This behavior differs to ZK mode where we require all target replicas to be 
> in the ISR for the reassignment to complete - see 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].
>  
> I thought more about it and I only think the reassignment can be completed 
> *and* we are under min ISR if either 1) we were already under min ISR in the 
> first place or 2) the replication factor decreases. So in practice I dont 
> think this is a severe issue. 
> It seems there are two options. One is to match the ZK behavior and only 
> complete reassignments when all target replicas are in the ISR. The second is 
> to complete reassignments when enough target replicas are in the ISR such 
> that we're above under min ISR. So if the under min ISR config for a topic is 
> two, then we would complete reassignments when at least two target replicas 
> are in the ISR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode

2023-01-09 Thread Andrew Grant (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14608:
-
Summary: Look into when reassignment should be completed in KRaft mode  
(was: Make sure reassignment does not cause under min ISR)

> Look into when reassignment should be completed in KRaft mode
> -
>
> Key: KAFKA-14608
> URL: https://issues.apache.org/jira/browse/KAFKA-14608
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Andrew Grant
>Assignee: Andrew Grant
>Priority: Major
>
> In KRaft mode we complete reassignments when the adding replicas have been 
> added to the ISR - see 
> [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.]
>  As a result its possible for the partition to be under min ISR if the number 
> of adding replicas is less than the topic's under min ISR config and some 
> other target replicas are not in the ISR for whatever reason.
> This behavior differs to ZK mode where we require all target replicas to be 
> in the ISR for the reassignment to complete - see 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003].
>  
> I thought more about it and I only think the reassignment can be completed 
> *and* we are under min ISR if either 1) we were already under min ISR in the 
> first place or 2) the replication factor decreases. So in practice I dont 
> think this is a severe issue. 
> It seems there are two options. One is to match the ZK behavior and only 
> complete reassignments when all target replicas are in the ISR. The second is 
> to complete reassignments when enough target replicas are in the ISR such 
> that we're above under min ISR. So if the under min ISR config for a topic is 
> two, then we would complete reassignments when at least two target replicas 
> are in the ISR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #13058: KAFKA-14557; Lock metadata log dir

2023-01-09 Thread GitBox


rondagostino commented on code in PR #13058:
URL: https://github.com/apache/kafka/pull/13058#discussion_r1065242904


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -127,18 +127,18 @@ class RaftManagerTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("metadata", "log", "metadata,log"))
+  @ValueSource(strings = Array("metadata-only", "log-only", "both"))
   def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
-val logDir = if (dirType.contains("metadata")) {
-  Some(TestUtils.tempDir().toPath)
-} else {
+val logDir = if (dirType.equals("metadata-only")) {
   None
+} else {
+  Some(TestUtils.tempDir().toPath)
 }
 
-val metadataDir = if (dirType.contains("log")) {
-  Some(TestUtils.tempDir().toPath)
-} else {
+val metadataDir = if (dirType.contains("log-only")) {

Review Comment:
   `s/contains/equals/`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #13077: KAFKA-14279; Add 3.3.x streams system tests

2023-01-09 Thread GitBox


jsancio commented on PR #13077:
URL: https://github.com/apache/kafka/pull/13077#issuecomment-1376537132

   @mjsax I got a chance to work on this today. I updated the PR. Here is the 
result from running the test locally:
   ```bash
   
TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces"
 _DUCKTAPE_OPTIONS='--parameters 
'\''{"from_version":"3.3.1","to_version":"3.5.0-SNAPSHOT"}'\' 
tests/docker/run_tests.sh
   
   > Configure project :
   Starting build with version 3.5.0-SNAPSHOT (commit id a0090d24) using Gradle 
7.6, Java 1.8 and Scala 2.13.10
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   BUILD SUCCESSFUL in 1s
   168 actionable tasks: 168 up-to-date
   docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces
 --parameters '{"from_version":"3.3.1","to_version":"3.5.0-SNAPSHOT"}'"
   /usr/local/lib/python3.9/dist-packages/paramiko/transport.py:236: 
CryptographyDeprecationWarning: Blowfish has been deprecated
 "class": algorithms.Blowfish,
   [INFO:2023-01-09 16:14:29,112]: starting test run with session id 
2023-01-09--003...
   [INFO:2023-01-09 16:14:29,112]: running 1 tests...
   [INFO:2023-01-09 16:14:29,112]: Triggering test 1 of 1...
   [INFO:2023-01-09 16:14:29,117]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name': 
'streams_upgrade_test.py', 'cls_name': 'StreamsUpgradeTest', 'method_name': 
'test_rolling_upgrade_with_2_bounces', 'injected_args': {'from_version': 
'3.3.1', 'to_version': '3.5.0-SNAPSHOT'}}
   [INFO:2023-01-09 16:14:29,118]: RunnerClient: 
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT:
 on run 1/1
   [INFO:2023-01-09 16:14:29,119]: RunnerClient: 
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT:
 Setting up...
   [INFO:2023-01-09 16:14:29,119]: RunnerClient: 
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT:
 Running...
   [INFO:2023-01-09 16:18:09,123]: RunnerClient: 
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT:
 Tearing down...
   [INFO:2023-01-09 16:18:28,193]: RunnerClient: 
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT:
 PASS
   [INFO:2023-01-09 16:18:28,193]: RunnerClient: 
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT:
 Data: None
   

   SESSION REPORT (ALL TESTS)
   ducktape version: 0.11.1
   session_id:   2023-01-09--003
   run time: 3 minutes 59.087 seconds
   tests run:1
   passed:   1
   flaky:0
   failed:   0
   ignored:  0
   

   test_id:
kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT
   status: PASS
   run time:   3 minutes 59.075 seconds
   

   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13064: MINOR: bump streams quickstart pom versions and add to list in gradle.properties

2023-01-09 Thread GitBox


ableegoldman merged PR #13064:
URL: https://github.com/apache/kafka/pull/13064


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #13063: MINOR: Update KRaft cluster upgrade documentation for 3.4

2023-01-09 Thread GitBox


ableegoldman merged PR #13063:
URL: https://github.com/apache/kafka/pull/13063


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


jolshan commented on PR #12870:
URL: https://github.com/apache/kafka/pull/12870#issuecomment-1376482387

   Looks fairly reasonable. I'm going to rebuild though to see if some of the 
tests look flaky.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


jeffkbkim commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1065176140


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
 this.error = null;
 }
 
+public OffsetFetchResponse(List groups, short 
version) {
+super(ApiKeys.OFFSET_FETCH);
+data = new OffsetFetchResponseData();
+
+if (version >= 8) {
+data.setGroups(groups);
+error = null;
+
+for (OffsetFetchResponseGroup group : data.groups()) {
+this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+}
+} else {
+if (groups.size() != 1) {
+throw new UnsupportedVersionException(
+"Version " + version + " of OffsetFetchResponse only 
support one group."
+);
+}
+
+OffsetFetchResponseGroup group = groups.get(0);
+data.setErrorCode(group.errorCode());
+error = Errors.forCode(group.errorCode());
+
+group.topics().forEach(topic -> {
+OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
+data.topics().add(newTopic);
+
+topic.partitions().forEach(partition -> {
+OffsetFetchResponsePartition newPartition;
+
+if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
+// Versions prior to version 2 does not support a top 
level error. Therefore
+// we put it at the partition level.
+newPartition = new OffsetFetchResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+.setErrorCode(group.errorCode());
+} else {

Review Comment:
   that makes sense. thanks for the clarification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rishiraj88 commented on pull request #13097: Draft: only wipe state store under EOS regardless of state

2023-01-09 Thread GitBox


rishiraj88 commented on PR #13097:
URL: https://github.com/apache/kafka/pull/13097#issuecomment-1376384331

   @lqxshay , thanks for for the helpful checklist.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rishiraj88 commented on pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-09 Thread GitBox


rishiraj88 commented on PR #12998:
URL: https://github.com/apache/kafka/pull/12998#issuecomment-1376375860

   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-09 Thread GitBox


hachikuji commented on PR #12972:
URL: https://github.com/apache/kafka/pull/12972#issuecomment-1376323364

   Would it be possible to make the release status part of the JSON spec? For 
example:
   ```json
 "apiStability": "evolving|stable"
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.

2023-01-09 Thread GitBox


cmccabe merged PR #12998:
URL: https://github.com/apache/kafka/pull/12998


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lqxshay opened a new pull request, #13097: Draft: only wipe state store under EOS regardless of state

2023-01-09 Thread GitBox


lqxshay opened a new pull request, #13097:
URL: https://github.com/apache/kafka/pull/13097

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-09 Thread GitBox


jolshan commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1376065254

   [clolov](https://github.com/clolov) I see your point about people not 
knowing to use those methods -- maybe we can add some documentation. 
   
   However, I'm wondering if changing the map will affect performance for the 
reading and writing of the map. I don't think it makes sense to take a hit 
there. We will also be reading the size very frequently for the metric. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-09 Thread GitBox


jolshan commented on code in PR #13078:
URL: https://github.com/apache/kafka/pull/13078#discussion_r1064952018


##
docs/ops.html:
##
@@ -1604,6 +1604,11 @@ 

[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests

2023-01-09 Thread GitBox


philipnee commented on code in PR #13021:
URL: https://github.com/apache/kafka/pull/13021#discussion_r1064949778


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.OffsetCommitRequestData;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+public class CommitRequestManager implements RequestManager {
+private final Queue stagedCommits;
+// TODO: We will need to refactor the subscriptionState
+private final SubscriptionState subscriptionState;
+private final Logger log;
+private final Optional autoCommitState;
+private final Optional 
coordinatorRequestManager;
+private final GroupStateManager groupState;
+
+public CommitRequestManager(
+final Time time,
+final LogContext logContext,
+final SubscriptionState subscriptionState,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final GroupStateManager groupState) {
+this.log = logContext.logger(getClass());
+this.stagedCommits = new LinkedList<>();
+if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+final long autoCommitInterval =
+
Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+this.autoCommitState = Optional.of(new AutoCommitState(time, 
autoCommitInterval));
+} else {
+this.autoCommitState = Optional.empty();
+}
+this.coordinatorRequestManager = 
Optional.ofNullable(coordinatorRequestManager);
+this.groupState = groupState;
+this.subscriptionState = subscriptionState;
+}
+
+// Visible for testing
+CommitRequestManager(
+final Time time,
+final LogContext logContext,
+final SubscriptionState subscriptionState,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final GroupStateManager groupState,
+final AutoCommitState autoCommitState) {
+this.log = logContext.logger(getClass());
+this.subscriptionState = subscriptionState;
+this.coordinatorRequestManager = 
Optional.ofNullable(coordinatorRequestManager);
+this.groupState = groupState;
+this.autoCommitState = Optional.ofNullable(autoCommitState);
+this.stagedCommits = new LinkedList<>();
+}
+
+/**
+ * Poll for the commit request if there's any. The function will also try 
to autocommit, if enabled.
+ *
+ * @param currentTimeMs
+ * @return
+ */
+@Override
+public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+if (!coordinatorRequestManager.isPresent()) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+}
+
+maybeAutoCommit(currentTimeMs);
+
+if (stagedCommits.isEmpty()) {
+return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new 
ArrayList<>());
+}
+
+List unsentCommitRequests =
+
stagedCommits.stream().map(StagedCommit::toUnsent

[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


ijuma commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1376021193

   We already depend on core when it comes to the tools test module, so we 
don't necessarily have to move things for that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14609) Kafka Streams Processor API cannot use state stores

2023-01-09 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656218#comment-17656218
 ] 

Bill Bejeck commented on KAFKA-14609:
-

It just received approval on the dev list, so I'd say within a week.

> Kafka Streams Processor API cannot use state stores
> ---
>
> Key: KAFKA-14609
> URL: https://issues.apache.org/jira/browse/KAFKA-14609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Philipp Schirmer
>Priority: Major
>
> The recently introduced Kafka Streams Processor API (since 3.3, 
> https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with 
> regards to using state stores. The 
> [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-]
>  method returns null, even though the store has been registered according to 
> the docs. The old transformer API still works. I created a small project that 
> demonstrates the behavior. It uses both methods to register a store for the 
> transformer, as well as the processor API: 
> https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams

2023-01-09 Thread Sujay Hegde (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656213#comment-17656213
 ] 

Sujay Hegde commented on KAFKA-14404:
-

[~ableegoldman] 

I need some clarification about the description.
I will go through the code + docs and get back.



Thanks

> Fix & update docs on client configs controlled by Streams
> -
>
> Key: KAFKA-14404
> URL: https://issues.apache.org/jira/browse/KAFKA-14404
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: docs, newbie
>
> There are a handful of client configs that can't be set by Streams users for 
> various reasons, such as the group id, but we seem to have missed a few of 
> them in the documentation 
> [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]:
>  the partitioner assignor (Consumer) and partitioner (Producer).
> This section of the docs also just needs to be cleaned up in general as there 
> is overlap between the [Default 
> Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values]
>  and [Parameters controlled by Kafka 
> Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]
>  sections, and the table of contents is messed up presumably due to an issue 
> with the section headers.
> We should separate these with one section covering (only) configs where 
> Streams sets a different default but this can still be overridden by the 
> user, and the other section covering the configs that Streams hardcodes and 
> users can never override.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-09 Thread GitBox


fvaleri commented on PR #13080:
URL: https://github.com/apache/kafka/pull/13080#issuecomment-1375959364

   LGTM. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest

2023-01-09 Thread GitBox


gharris1727 commented on PR #13084:
URL: https://github.com/apache/kafka/pull/13084#issuecomment-1375950088

   Ah, I recall working on this before. I was the one that added that 
STARTUP_MODE_JOIN override you linked: https://github.com/apache/kafka/pull/9040
   
   In the description, I mentioned how JOIN was a superset of LISTEN, and I 
think that's still the case. The jetty server starts: 
https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L207
 before the herder does: 
https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L53-L54
   
   However, as you've noticed, the registration of the resources occurs _after_ 
the server begins listening, and _after_ the herder joins the group. So neither 
LISTEN or JOIN is sufficient to ensure that the resources are registered. But 
changing from JOIN to LISTEN is going to have the opposite effect that you're 
intending, as the LISTEN condition is true even earlier than JOIN is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] satishd commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.

2023-01-09 Thread GitBox


satishd commented on PR #13046:
URL: https://github.com/apache/kafka/pull/13046#issuecomment-1375939273

   @ijuma I rebased with trunk and resolved conflicts. Please review it when 
you get some time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2023-01-09 Thread GitBox


vamossagar12 commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1064852750


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -780,6 +774,14 @@ protected void stopServices() {
 }
 }
 
+// Timeout for herderExecutor to gracefully terminate is set to a value to 
accommodate
+// reading to the end of the config topic + successfully attempting to 
stop all connectors and tasks and a buffer of 10s
+private long herderExecutorTimeoutMs() {
+return this.workerSyncTimeoutMs +
+config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG) 
+

Review Comment:
   Nope that wasn't right. Changed it now. Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


mimaison commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375933240

   Yes I can review this.
   
   I started looking at 
[KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) because we 
were stepping on each others toes in 
[KAFKA-14470](https://issues.apache.org/jira/browse/KAFKA-14470), but we should 
finish that first.
   
   Many of the tests for these commands start full clusters and all that test 
logic is currently in core. We should be able to move it to server-common but 
I'm not quite sure if we want to drag many ZooKeeper bits there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya opened a new pull request, #13096: MINOR: Multiple clean ups associated with scala collection

2023-01-09 Thread GitBox


divijvaidya opened a new pull request, #13096:
URL: https://github.com/apache/kafka/pull/13096

   Clean up of different aspects on the scala code. Some are potential 
performance improvement and some are readability improvements.
   
   Example for type of changes are:
   1. Merge consecutive filter calls to avoid creation of an intermediate 
collection
   2. Don’t resort to pattern matching to check value existence. The simplified 
expression works faster.
   3. For `option`, don’t emulate `exists` & other `monadic` functions using 
pattern matching. see: 
https://pavelfatin.com/scala-collections-tips-and-tricks/#options-processing
   4. Don’t use map when result is ignored, use `foreach` instead.
   5. Using `.lengthCompare(n) > 0)` instead of `.length()` reduces the 
complexity from O(length) to O(length min n)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375924994

   Actually i pinged too soon :) Before getting it reviewed, I would test on my 
local and also add a couple of tests. Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…

2023-01-09 Thread GitBox


C0urante commented on code in PR #11818:
URL: https://github.com/apache/kafka/pull/11818#discussion_r1064843478


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -69,14 +69,19 @@ public MirrorSourceTask() {}
 
 // for testing
 MirrorSourceTask(KafkaConsumer consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
- ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer producer) {
+ ReplicationPolicy replicationPolicy, long maxOffsetLag, 
KafkaProducer producer,
+ Semaphore outstandingOffsetSyncs, Map partitionStates,

Review Comment:
   One alternative is to hardcode the instantiation of the 
`outstandingOffsetSyncs` semaphore in the testing-only constructor, without 
relying on a constructor parameter (and the same can be done for the 
`partitionStates` field).
   
   But that has its own problems of duplicating instantiation logic for those 
fields and leading to possible divergence in behavior between task instances 
depending on whether they're brought up for testing or not, if we're not 
careful about keeping that instantiation logic in sync.
   
   One way that that issue can be addressed is by pulling out any non-trivial 
instantiation logic into a separate static method; e.g.:
   ```java
   private static Semaphore newOutstandingOffsetSyncsSemaphore() {
   return new Semaphore(MAX_OUSTANDING_OFFSET_SYNCS);
   }
   ```
   
   But I don't think that this is so much better than what you have right now 
that I'd block the PR on adapting this approach; even if it has some 
advantages, the additional complexity makes it debatable whether it'd really be 
worth the tradeoff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-09 Thread GitBox


mimaison commented on code in PR #13080:
URL: https://github.com/apache/kafka/pull/13080#discussion_r1064840727


##
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class ClusterTool {
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {

Review Comment:
   Yes. I was thinking of doing that after the Scala to Java conversion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…

2023-01-09 Thread GitBox


C0urante commented on code in PR #11818:
URL: https://github.com/apache/kafka/pull/11818#discussion_r1064822714


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##
@@ -256,39 +277,76 @@ public void testPartitionStateMutation() {
 partitionStates.put(sourceTopicPartition, partitionState);
 RecordMetadata recordMetadata = new 
RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
 
-when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true);
+doAnswer(new Answer() {
+@Override
+public Object answer(final InvocationOnMock invocation) {
+final Callback callback = invocation.getArgument(1);
+callback.onCompletion(null, null);
+return null;
+}
+}).when(producer).send(any(), any());
 mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
 assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, 
"sync offsets");
 assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, 
"sync offsets");
 assertEquals(recordOffset, partitionState.previousUpstreamOffset, 
"sync offsets");
 assertEquals(metadataOffset, partitionState.previousDownstreamOffset, 
"sync offsets");
-assertFalse(partitionState.shouldSyncOffsets);
+assertFalse(partitionState.shouldSyncOffsets, "partition state reset");
+verify(producer, times(1)).send(any(), any());
 
-int newRecordOffset = 2;
-int newMetadataOffset = 102;
-recordMetadata = new RecordMetadata(sourceTopicPartition, 
newMetadataOffset, 0, 0, 0, recordPartition);
+recordOffset = 2;
+metadataOffset = 102;
+recordMetadata = new RecordMetadata(sourceTopicPartition, 
metadataOffset, 0, 0, 0, recordPartition);
 sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
-newRecordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
 recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
 
-when(outstandingOffsetSyncs.tryAcquire()).thenReturn(false);
+// Do not release outstanding sync semaphore
+doAnswer(new Answer() {
+@Override
+public Object answer(final InvocationOnMock invocation) {
+return null;
+}
+}).when(producer).send(any(), any());
 mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
-// Expect partition state to be updated
-assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, 
"sync offsets");
-assertEquals(newMetadataOffset, 
partitionState.lastSyncDownstreamOffset, "sync offsets");
-assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, 
"sync offsets");
-assertEquals(newMetadataOffset, 
partitionState.previousDownstreamOffset, "sync offsets");
-assertTrue(partitionState.shouldSyncOffsets);
-verify(producer, times(1)).send(any(), any());
 
-when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true);
+assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, 
"sync offsets");
+assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, 
"sync offsets");
+assertEquals(recordOffset, partitionState.previousUpstreamOffset, 
"sync offsets");
+assertEquals(metadataOffset, partitionState.previousDownstreamOffset, 
"sync offsets");
+assertFalse(partitionState.shouldSyncOffsets, "partition state reset");
+verify(producer, times(2)).send(any(), any());
+
+recordOffset = 4;
+metadataOffset = 104;
+recordMetadata = new RecordMetadata(sourceTopicPartition, 
metadataOffset, 0, 0, 0, recordPartition);
+sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, recordPartition,
+recordOffset, System.currentTimeMillis(), 
TimestampType.CREATE_TIME, recordKey.length,
+recordValue.length, recordKey, recordValue, headers, 
Optional.empty()));
+
 mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
-assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, 
"partition state is synced");
-assertEquals(newMetadataOffset, 
partitionState.lastSyncDownstreamOffset, "partition state is synced");
-assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, 
"partition state is synced");
-assertEquals(newMetadataOffset, 
partitionState.previousDownstreamOffset, "partition state is synced");
-assertFalse(partitionState.shouldSyncOffsets);
+assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, 
"sync offsets");
+

[GitHub] [kafka] dajac commented on pull request #12901: KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


dajac commented on PR #12901:
URL: https://github.com/apache/kafka/pull/12901#issuecomment-1375874051

   @jolshan I have updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14609) Kafka Streams Processor API cannot use state stores

2023-01-09 Thread Philipp Schirmer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656173#comment-17656173
 ] 

Philipp Schirmer commented on KAFKA-14609:
--

Thanks, I didn't find that issue. Do you know when 3.3.2 will be released?

> Kafka Streams Processor API cannot use state stores
> ---
>
> Key: KAFKA-14609
> URL: https://issues.apache.org/jira/browse/KAFKA-14609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Philipp Schirmer
>Priority: Major
>
> The recently introduced Kafka Streams Processor API (since 3.3, 
> https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with 
> regards to using state stores. The 
> [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-]
>  method returns null, even though the store has been registered according to 
> the docs. The old transformer API still works. I created a small project that 
> demonstrates the behavior. It uses both methods to register a store for the 
> transformer, as well as the processor API: 
> https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14610) Publish Mirror Maker 2 offset syncs in task commit method

2023-01-09 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-14610:
-

 Summary: Publish Mirror Maker 2 offset syncs in task commit method
 Key: KAFKA-14610
 URL: https://issues.apache.org/jira/browse/KAFKA-14610
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Chris Egerton


Mirror Maker 2 periodically publishes offset sync messages to a Kafka topic 
that contains the corresponding upstream and downstream offsets for a 
replicated topic partition.

 

Currently, this publishing takes place inside the [commitRecord 
method|https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L192],
 which is invoked by the Kafka Connect framework after a source record has been 
successfully sent by its producer (i.e., ack'd by the requested number of 
brokers).

 

Mirror Maker 2 also has logic to limit the number of in-flight offset sync 
messages. Once ten messages have been dispatched to the producer used for 
offset syncs (which is a separate producer from the one that the Kafka Connect 
framework uses for sending records received from the [poll 
method|https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L134])
 that have not yet been ack'd by the requested number of brokers, Mirror Maker 
2 begins to skip sending offset sync messages, and will only resume sending 
messages once the number of in-flight offset syncs goes below 10, and new calls 
to the {{commitRecord}} method take place.

 

When bursts of throughput occur in replicated topic partitions, this can cause 
offset syncs to be dropped for long periods of time if an offset sync is 
skipped for some topic partition due to a high number of in-flight messages and 
then no further messages are read from that same topic partition for a while.

 

Instead, the task should cache offset syncs in its {{{}commitRecord method{}}}, 
and only actually send offset sync messages in its [commit 
method|https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L108],
 which is invoked periodically by the Kafka Connect framework. Any offset syncs 
that are skipped due to too many in-flight messages will then be automatically 
retried later when {{commit}} is re-invoked, regardless of whether any more 
records are read from the corresponding topic partition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


ijuma commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375847369

   Also, I'm currently focused on completing KAFKA-14470. @mimaison since you 
fleshed out KAFKA-14525, do you have cycles to do these reviews?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14609) Kafka Streams Processor API cannot use state stores

2023-01-09 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck resolved KAFKA-14609.
-
Resolution: Fixed

Fixed by https://issues.apache.org/jira/browse/KAFKA-14388

> Kafka Streams Processor API cannot use state stores
> ---
>
> Key: KAFKA-14609
> URL: https://issues.apache.org/jira/browse/KAFKA-14609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Philipp Schirmer
>Priority: Major
>
> The recently introduced Kafka Streams Processor API (since 3.3, 
> https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with 
> regards to using state stores. The 
> [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-]
>  method returns null, even though the store has been registered according to 
> the docs. The old transformer API still works. I created a small project that 
> demonstrates the behavior. It uses both methods to register a store for the 
> transformer, as well as the processor API: 
> https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


ijuma commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375845098

   Thanks for the PR. Can we add a test in that case? We'd want to verify 
manually that the test matches the previous behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14609) Kafka Streams Processor API cannot use state stores

2023-01-09 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656166#comment-17656166
 ] 

Bill Bejeck commented on KAFKA-14609:
-

Hi [~philipp94831] 

Thanks for reporting this issue.  I believe it's been fixed with 
https://issues.apache.org/jira/browse/KAFKA-14388.

You could pull down either the 3.4.0 branch or 3.3.2 and build from source and 
test it.  For now, I'm going to mark this as fixed.  

> Kafka Streams Processor API cannot use state stores
> ---
>
> Key: KAFKA-14609
> URL: https://issues.apache.org/jira/browse/KAFKA-14609
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Philipp Schirmer
>Priority: Major
>
> The recently introduced Kafka Streams Processor API (since 3.3, 
> https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with 
> regards to using state stores. The 
> [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-]
>  method returns null, even though the store has been registered according to 
> the docs. The old transformer API still works. I created a small project that 
> demonstrates the behavior. It uses both methods to register a store for the 
> transformer, as well as the processor API: 
> https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on a diff in pull request #13058: KAFKA-14557; Lock metadata log dir

2023-01-09 Thread GitBox


rondagostino commented on code in PR #13058:
URL: https://github.com/apache/kafka/pull/13058#discussion_r1064790638


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -81,25 +100,99 @@ class RaftManagerTest {
 )
   }
 
-  @Test
-  def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
-val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "broker", "1")
-assertEquals(1, raftManager.client.nodeId.getAsInt)
+  @ParameterizedTest
+  @ValueSource(strings = Array("broker", "controller", "broker,controller"))
+  def testNodeIdPresent(processRoles: String): Unit = {
+var processRolesSet = Set.empty[ProcessRole]
+if (processRoles.contains("broker")) {
+  processRolesSet = processRolesSet ++ Set(BrokerRole)
+}
+if (processRoles.contains("controller")) {
+  processRolesSet = processRolesSet ++ Set(ControllerRole)
+}
+
+val logDir = TestUtils.tempDir()
+val nodeId = 1
+val raftManager = createRaftManager(
+  new TopicPartition("__raft_id_test", 0),
+  createConfig(
+processRolesSet,
+nodeId,
+Some(logDir.toPath),
+None
+  )
+)
+assertEquals(nodeId, raftManager.client.nodeId.getAsInt)
 raftManager.shutdown()
   }
 
-  @Test
-  def testNodeIdPresentIfControllerRoleOnly(): Unit = {
-val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "controller", "1")
-assertEquals(1, raftManager.client.nodeId.getAsInt)
+  @ParameterizedTest
+  @ValueSource(strings = Array("metadata", "log", "metadata,log"))
+  def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
+val logDir = if (dirType.contains("metadata")) {
+  Some(TestUtils.tempDir().toPath)
+} else {
+  None
+}
+
+val metadataDir = if (dirType.contains("log")) {
+  Some(TestUtils.tempDir().toPath)
+} else {
+  None
+}

Review Comment:
   I think this might be clearer.
   ```
 @ValueSource(strings = Array("metadata-only", "log-only", "both"))
 def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
   val logDir = if (!dirType.equals("metadata-only")) {
 Some(TestUtils.tempDir().toPath)
   } else {
 None
   }
   
   val metadataDir = if (!dirType.equals("log-only")) {
 Some(TestUtils.tempDir().toPath)
   } else {
 None
   }
   ```



##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -81,25 +100,99 @@ class RaftManagerTest {
 )
   }
 
-  @Test
-  def testNodeIdPresentIfBrokerRoleOnly(): Unit = {
-val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "broker", "1")
-assertEquals(1, raftManager.client.nodeId.getAsInt)
+  @ParameterizedTest
+  @ValueSource(strings = Array("broker", "controller", "broker,controller"))
+  def testNodeIdPresent(processRoles: String): Unit = {
+var processRolesSet = Set.empty[ProcessRole]
+if (processRoles.contains("broker")) {
+  processRolesSet = processRolesSet ++ Set(BrokerRole)
+}
+if (processRoles.contains("controller")) {
+  processRolesSet = processRolesSet ++ Set(ControllerRole)
+}
+
+val logDir = TestUtils.tempDir()
+val nodeId = 1
+val raftManager = createRaftManager(
+  new TopicPartition("__raft_id_test", 0),
+  createConfig(
+processRolesSet,
+nodeId,
+Some(logDir.toPath),
+None
+  )
+)
+assertEquals(nodeId, raftManager.client.nodeId.getAsInt)
 raftManager.shutdown()
   }
 
-  @Test
-  def testNodeIdPresentIfControllerRoleOnly(): Unit = {
-val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "controller", "1")
-assertEquals(1, raftManager.client.nodeId.getAsInt)
+  @ParameterizedTest
+  @ValueSource(strings = Array("metadata", "log", "metadata,log"))
+  def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
+val logDir = if (dirType.contains("metadata")) {
+  Some(TestUtils.tempDir().toPath)
+} else {
+  None
+}
+
+val metadataDir = if (dirType.contains("log")) {
+  Some(TestUtils.tempDir().toPath)
+} else {
+  None
+}
+
+val nodeId = 1
+val raftManager = createRaftManager(
+  new TopicPartition("__raft_id_test", 0),
+  createConfig(
+Set(ControllerRole),
+nodeId,
+logDir,
+metadataDir
+  )
+)
+
+val lockPath = 
metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName)
+assertTrue(fileLocked(lockPath))
+
 raftManager.shutdown()
+
+assertFalse(fileLocked(lockPath))
   }
 
   @Test
-  def testNodeIdPresentIfColocated(): Unit = {
-val raftManager = instantiateRaftManagerWithConfigs(new 
TopicPartition("__raft_id_test", 0), "controller,broker", "1")
-assertEquals(1, raftManager.client.nodeId.getAsInt)
+  def testLogDirLockWhenMetadataDi

[GitHub] [kafka] C0urante commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks

2023-01-09 Thread GitBox


C0urante commented on code in PR #12802:
URL: https://github.com/apache/kafka/pull/12802#discussion_r1064784201


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##
@@ -780,6 +774,14 @@ protected void stopServices() {
 }
 }
 
+// Timeout for herderExecutor to gracefully terminate is set to a value to 
accommodate
+// reading to the end of the config topic + successfully attempting to 
stop all connectors and tasks and a buffer of 10s
+private long herderExecutorTimeoutMs() {
+return this.workerSyncTimeoutMs +
+config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG) 
+

Review Comment:
   Wait a minute, is this right? Why are we using the sync timeout twice here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14535) Flaky test PlaintextEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe

2023-01-09 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656151#comment-17656151
 ] 

Proven Provenzano commented on KAFKA-14535:
---

Thanks for the patch!

> Flaky test 
> PlaintextEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe
> -
>
> Key: KAFKA-14535
> URL: https://issues.apache.org/jira/browse/KAFKA-14535
> Project: Kafka
>  Issue Type: Test
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> This test has failed multiple times recently:
>     
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1446/tests/]
>         org.opentest4j.AssertionFailedError: expected acls:
>     (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
> but got:
>     (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>     
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1439/tests/]
>         org.opentest4j.AssertionFailedError: expected acls:
>     (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
> but got:
>     (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>     
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1436/tests/]
>         org.opentest4j.AssertionFailedError: expected acls:
>     (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
> but got:
>     (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
>     (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
> The stacktrace is:
> {noformat}
> org.opentest4j.AssertionFailedError: expected acls:
>   (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>   (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW)
>   (principal=User:client, host=*, operation=DESCRIBE, 
> permissionType=ALLOW)
>   (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
> but got:
>   (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW)
>   (principal=User:client, host=*, operation=DESCRIBE, 
> permissionType=ALLOW)
>   (principal=User:client, host=*, operation=READ, permissionType=ALLOW)
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
>   at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
>   at app//kafka.utils.TestUtils$.waitAndVerifyAcls(TestUtils.scala:1075)
>   at 
> app//kafka.api.EndToEndAuthorizationTest.$anonfun$setReadAndWriteAcls$1(EndToEndAuthorizationTest.scala:312)
>   at 
> app//kafka.api.EndToEndAuthorizationTest.$anonfun$setReadAndWriteAcls$1$adapted(EndToEndAuthorizationTest.scala:311)
>   at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575)
>   at 
> app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573)
>   at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933)
>   at 
> app//kafka.api.EndToEndAuthorizationTest.setReadAndWriteAcls(EndToEndAuthorizationTest.scala:311)
>   at 
> app//kafka.api.EndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe(EndToEndAuthorizationTest.scala:478)
>   at 
> java.base@17.0.4.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>  Method)
>   at 
> java.base@17.0.4.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base@17.0.4.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>

[GitHub] [kafka] fvaleri commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-09 Thread GitBox


fvaleri commented on code in PR #13080:
URL: https://github.com/apache/kafka/pull/13080#discussion_r1064750856


##
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class ClusterTool {
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {

Review Comment:
   I like the idea of the command interface. In that case, we should add the 
suffix `*Command` to all implementing classes (e.g. `ClusterToolCommand`). 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375755934

   @ijuma , I made the changes, but I couldn't find any tests associated with 
the scala class. Wanted to know how can I test this . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-09 Thread GitBox


vamossagar12 opened a new pull request, #13095:
URL: https://github.com/apache/kafka/pull/13095

   Move EndToEndLatency to tools


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656141#comment-17656141
 ] 

Chris Egerton commented on KAFKA-14565:
---

[~beardt] thanks for identifying this issue. Have you considered either 
altering the {{AbstractConfig::getConfiguredInstances}} method, or the logic in 
the client classes that leverage it, to invoke {{close}} on any interceptors 
that have already been instantiated and configured in the scenario described in 
this ticket? This would allow us to address the resource leak problem without 
altering public interface (which requires a KIP) or requiring action on the 
part of developers.

> Improve Interceptor Resource Leakage Prevention
> ---
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


dajac commented on PR #12870:
URL: https://github.com/apache/kafka/pull/12870#issuecomment-1375740915

   @jolshan @jeffkbkim I have updated the PR. Could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064719096


##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
 }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   I have looked at other APIs and we are not consistent, unfortunately. I 
believe that my current implementation (returning a response for each provided 
group in the same order) is the right way and that could be considered as a 
bug. I have asked about this in the KIP-709 discuss thread as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-09 Thread GitBox


mimaison commented on code in PR #13080:
URL: https://github.com/apache/kafka/pull/13080#discussion_r1064710964


##
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class ClusterTool {
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {

Review Comment:
   The idea was to make it like `MetadataQuorumCommand`. In the future we will 
be able to put this method in a Command interface or utils class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14609) Kafka Streams Processor API cannot use state stores

2023-01-09 Thread Philipp Schirmer (Jira)
Philipp Schirmer created KAFKA-14609:


 Summary: Kafka Streams Processor API cannot use state stores
 Key: KAFKA-14609
 URL: https://issues.apache.org/jira/browse/KAFKA-14609
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.0
Reporter: Philipp Schirmer


The recently introduced Kafka Streams Processor API (since 3.3, 
https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with 
regards to using state stores. The 
[getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-]
 method returns null, even though the store has been registered according to 
the docs. The old transformer API still works. I created a small project that 
demonstrates the behavior. It uses both methods to register a store for the 
transformer, as well as the processor API: 
https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-09 Thread GitBox


mimaison commented on code in PR #13080:
URL: https://github.com/apache/kafka/pull/13080#discussion_r1064694674


##
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class ClusterTool {
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws Exception {
+ArgumentParser parser = ArgumentParsers
+.newArgumentParser("kafka-cluster")
+.defaultHelp(true)
+.description("The Kafka cluster tool.");
+Subparsers subparsers = parser.addSubparsers().dest("command");
+
+Subparser clusterIdParser = subparsers.addParser("cluster-id")
+.help("Get information about the ID of a cluster.");
+Subparser unregisterParser = subparsers.addParser("unregister")
+.help("Unregister a broker.");
+for (Subparser subpparser : Arrays.asList(clusterIdParser, 
unregisterParser)) {
+subpparser.addArgument("--bootstrap-server", "-b")
+.action(store())
+.help("A list of host/port pairs to use for establishing 
the connection to the kafka cluster.");
+subpparser.addArgument("--config", "-c")
+.action(store())
+.help("A property file containing configs to passed to 
AdminClient.");

Review Comment:
   Thanks, I've reworded that message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064619499


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
   offsetFetchResponse
 }
 requestHelper.sendResponseMaybeThrottle(request, createResponse)
+CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
 val offsetFetchRequest = request.body[OffsetFetchRequest]
-val groupId = offsetFetchRequest.groupId()
-val (error, partitionData) = fetchOffsets(groupId, 
offsetFetchRequest.isAllPartitions,
-  offsetFetchRequest.requireStable, offsetFetchRequest.partitions, 
request.context)
-def createResponse(requestThrottleMs: Int): AbstractResponse = {
-  val offsetFetchResponse =
-if (error != Errors.NONE) {
-  offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-} else {
-  new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
partitionData.asJava)
-}
-  trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-  offsetFetchResponse
+val groups = offsetFetchRequest.groups()
+val requireStable = offsetFetchRequest.requireStable()
+
+val futures = new 
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+groups.forEach { groupOffsetFetch =>
+  val isAllPartitions = groupOffsetFetch.topics == null
+  val future = if (isAllPartitions) {
+fetchAllOffsets(
+  request.context,
+  groupOffsetFetch,
+  requireStable
+)
+  } else {
+fetchOffsets(
+  request.context,
+  groupOffsetFetch,
+  requireStable
+)
+  }
+  futures += future
 }
-requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
-
-  private def handleOffsetFetchRequestV8AndAbove(request: 
RequestChannel.Request): Unit = {
-val header = request.header
-val offsetFetchRequest = request.body[OffsetFetchRequest]
-val groupIds = offsetFetchRequest.groupIds().asScala
-val groupToErrorMap =  mutable.Map.empty[String, Errors]
-val groupToPartitionData =  mutable.Map.empty[String, 
util.Map[TopicPartition, PartitionData]]
-val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-groupIds.foreach(g => {
-  val (error, partitionData) = fetchOffsets(g,
-offsetFetchRequest.isAllPartitionsForGroup(g),
-offsetFetchRequest.requireStable(),
-groupToTopicPartitions.get(g), request.context)
-  groupToErrorMap += (g -> error)
-  groupToPartitionData += (g -> partitionData.asJava)
-})
 
-def createResponse(requestThrottleMs: Int): AbstractResponse = {
-  val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-groupToErrorMap.asJava, groupToPartitionData.asJava)
-  trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-  offsetFetchResponse
+CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+  val groupResponses = new 
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+  futures.foreach(future => groupResponses += future.get())
+  requestHelper.sendMaybeThrottle(request, new 
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
 }
-
-requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, 
requireStable: Boolean,
-   partitions: util.List[TopicPartition], context: 
RequestContext): (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) = {
-if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-  (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-} else {
-  if (isAllPartitions) {
-val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable)
-if (error != Errors.NONE) {
-  (error, allPartitionData)
-} else {
-  // clients are not allowed to see offsets for topics that are not 
authorized for Describe
-  val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(context,
-DESCRIBE, TOPIC, allPartitionData)(_.topic)
-  (Errors.NONE, authorizedPartitionData)
-}
+  private def fetchAllOffsets(

Review Comment:
   Renaming in KafkaApis is reasonable. For GroupCoordinator, the ForGroup 
seems a bit redundant so I would rather keep it as it i

[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface

2023-01-09 Thread GitBox


dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064618016


##
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
 this.error = null;
 }
 
+public OffsetFetchResponse(List groups, short 
version) {
+super(ApiKeys.OFFSET_FETCH);
+data = new OffsetFetchResponseData();
+
+if (version >= 8) {
+data.setGroups(groups);
+error = null;
+
+for (OffsetFetchResponseGroup group : data.groups()) {
+this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+}
+} else {
+if (groups.size() != 1) {
+throw new UnsupportedVersionException(
+"Version " + version + " of OffsetFetchResponse only 
support one group."
+);
+}
+
+OffsetFetchResponseGroup group = groups.get(0);
+data.setErrorCode(group.errorCode());
+error = Errors.forCode(group.errorCode());
+
+group.topics().forEach(topic -> {
+OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
+data.topics().add(newTopic);
+
+topic.partitions().forEach(partition -> {
+OffsetFetchResponsePartition newPartition;
+
+if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
+// Versions prior to version 2 does not support a top 
level error. Therefore
+// we put it at the partition level.
+newPartition = new OffsetFetchResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+.setErrorCode(group.errorCode());
+} else {

Review Comment:
   It is still possible to have a partition level error with version >= 2 (e.g. 
UNSTABLE_OFFSET_COMMIT). To answer your second point, if there is an error, the 
offset/metadata should be correctly set at this stage so we can just copy 
whatever we have got here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13094: MINOR: Various cleanups in client tests

2023-01-09 Thread GitBox


divijvaidya commented on code in PR #13094:
URL: https://github.com/apache/kafka/pull/13094#discussion_r1064594474


##
clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java:
##
@@ -320,7 +315,7 @@ public void testDoubleBuild() {
 try {
 builder.build();
 fail("Expected calling build twice to fail.");
-} catch (Throwable t) {
+} catch (NullPointerException npe) {

Review Comment:
   NPE is a weird way of enforcing idempotency for build()! While you are in 
this code base could we add a javadoc to build() to clarify this expectation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention

2023-01-09 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Summary: Improve Interceptor Resource Leakage Prevention  (was: Improving 
Interceptor Resource Leakage Prevention)

> Improve Interceptor Resource Leakage Prevention
> ---
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open method with no implementation and check exception on 
> the respective Consumer/Producer interceptor interfaces.  This open method 
> will be responsible for creating threads and/or objects which utilizes 
> threads, connections or other resource which requires clean up.  
> Additionally, the default open method enables implementation optionality as 
> it's empty default behavior means it will do nothing when unimplemented.  
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen method which throws a checked exception.  
> In order to maintain backwards compatibility with earlier developed 
> interceptors the maybeOpen will check whether the interceptor's interface 
> contains the newer open method before calling it accordingly.   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-09 Thread GitBox


divijvaidya commented on code in PR #13078:
URL: https://github.com/apache/kafka/pull/13078#discussion_r1064581958


##
docs/ops.html:
##
@@ -1604,6 +1604,11 @@ 

[GitHub] [kafka] mimaison opened a new pull request, #13094: MINOR: Various cleanups in client tests

2023-01-09 Thread GitBox


mimaison opened a new pull request, #13094:
URL: https://github.com/apache/kafka/pull/13094

   - Simplify assertions
   - Remove redundant types
   - Use lambdas instead of anonymous classes
   - Remove unnecessary throws
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?

2023-01-09 Thread Devarshi Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656031#comment-17656031
 ] 

Devarshi Shah commented on KAFKA-13995:
---

Any updates guys?

> Does Kafka support Network File System (NFS)? Is it recommended in Production?
> --
>
> Key: KAFKA-13995
> URL: https://issues.apache.org/jira/browse/KAFKA-13995
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.0.0
> Environment: Kubernetes Cluster
>Reporter: Devarshi Shah
>Priority: Blocker
>
> I've gone through the Apache Kafka Documentation. It does not contain 
> information about the support of underlying storage type, whether Kafka 
> supports block storage, Network File System (NFS) and/or others. On the 
> internet, I could find that it supports NFS, however most of them summarize 
> not to use NFS in Production. May we get proper information whether Kafka 
> recommends NFS in Production, or it doesn't support NFS to begin with?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module

2023-01-09 Thread GitBox


fvaleri commented on code in PR #13080:
URL: https://github.com/apache/kafka/pull/13080#discussion_r1064457544


##
tools/src/main/java/org/apache/kafka/tools/ClusterTool.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.Namespace;
+import net.sourceforge.argparse4j.inf.Subparser;
+import net.sourceforge.argparse4j.inf.Subparsers;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+public class ClusterTool {
+
+public static void main(String... args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws Exception {
+ArgumentParser parser = ArgumentParsers
+.newArgumentParser("kafka-cluster")
+.defaultHelp(true)
+.description("The Kafka cluster tool.");
+Subparsers subparsers = parser.addSubparsers().dest("command");
+
+Subparser clusterIdParser = subparsers.addParser("cluster-id")
+.help("Get information about the ID of a cluster.");
+Subparser unregisterParser = subparsers.addParser("unregister")
+.help("Unregister a broker.");
+for (Subparser subpparser : Arrays.asList(clusterIdParser, 
unregisterParser)) {
+subpparser.addArgument("--bootstrap-server", "-b")
+.action(store())
+.help("A list of host/port pairs to use for establishing 
the connection to the kafka cluster.");
+subpparser.addArgument("--config", "-c")
+.action(store())
+.help("A property file containing configs to passed to 
AdminClient.");
+}
+unregisterParser.addArgument("--id", "-i")
+.type(Integer.class)
+.action(store())
+.required(true)
+.help("The ID of the broker to unregister.");
+
+Namespace namespace = parser.parseArgsOrFail(args);
+String command = namespace.getString("command");
+String configPath = namespace.getString("config");
+Properties properties = (configPath == null) ? new Properties() : 
Utils.loadProps(configPath);
+
+String bootstrapServer = namespace.getString("bootstrap_server");
+if (bootstrapServer != null) {
+properties.setProperty("bootstrap.servers", bootstrapServer);
+}
+if (properties.getProperty("bootstrap.servers") == null) {
+throw new TerseException("Please specify --bootstrap-server.");
+}
+
+switch (command) {
+case "cluster-id": {
+try (Admin adminClient = Admin.create(properties)) {
+clusterIdCommand(System.out, adminClient);
+}
+break;
+}
+case "unregister": {
+try (Admin adminClient = Admin.create(properties)) {
+unregisterCommand(System.out, adminClient, 
namespace.getInt("id"));
+}
+break;
+}
+default:
+throw new RuntimeException("Unknown command " + command);
+}
+}
+
+static void clusterIdCommand(PrintStream stream, Admin adminClient) throws 
Exception {
+String clusterId = adminClient.describeCluster().cl

[GitHub] [kafka] iamazy commented on pull request #13072: KAFKA-14570: Add missing closing parenthesis symbol

2023-01-09 Thread GitBox


iamazy commented on PR #13072:
URL: https://github.com/apache/kafka/pull/13072#issuecomment-1375394833

   > @iamazy Thanks for fixing this. For small issues like this you don't need 
to open a Jira, you can just open a PR and put `MINOR` in the title, for 
example 
[6b5e9e9](https://github.com/apache/kafka/commit/6b5e9e989b7a1f8c387a79dea0117e52401853e1)
   
   @mimaison Got it, Thanks you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13072: KAFKA-14570: Add missing closing parenthesis symbol

2023-01-09 Thread GitBox


mimaison commented on PR #13072:
URL: https://github.com/apache/kafka/pull/13072#issuecomment-1375384949

   @iamazy Thanks for fixing this. For small issues like this you don't need to 
open a Jira, you can just open a PR and put `MINOR` in the title, for example 
https://github.com/apache/kafka/commit/6b5e9e989b7a1f8c387a79dea0117e52401853e1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14570) Problem description missing closing parenthesis symbol

2023-01-09 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14570.

Fix Version/s: 3.5.0
   Resolution: Fixed

> Problem description missing closing parenthesis symbol
> --
>
> Key: KAFKA-14570
> URL: https://issues.apache.org/jira/browse/KAFKA-14570
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: iamazy
>Priority: Trivial
> Fix For: 3.5.0
>
>
> In 
> [verifyFullFetchResponsePartitions|https://github.com/apache/kafka/blob/ad94dc2134474c9d790fe0bb79c0d390c562846a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java#L425-L449],
>  the problem description missing closing parenthesis symbol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison merged pull request #13072: KAFKA-14570: Add missing closing parenthesis symbol

2023-01-09 Thread GitBox


mimaison merged PR #13072:
URL: https://github.com/apache/kafka/pull/13072


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna closed pull request #13093: DO NOT MERGE: PR to test GitHub API

2023-01-09 Thread GitBox


cadonna closed pull request #13093: DO NOT MERGE: PR to test GitHub API
URL: https://github.com/apache/kafka/pull/13093


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #13093: DO NOT MERGE: PR to test GitHub API

2023-01-09 Thread GitBox


cadonna opened a new pull request, #13093:
URL: https://github.com/apache/kafka/pull/13093

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14602) offsetDelta in BatchMetadata is an int but the values are computed as difference of offsets which are longs.

2023-01-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-14602:
-

Assignee: Luke Chen

> offsetDelta in BatchMetadata is an int but the values are computed as 
> difference of offsets which are longs.
> 
>
> Key: KAFKA-14602
> URL: https://issues.apache.org/jira/browse/KAFKA-14602
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Satish Duggana
>Assignee: Luke Chen
>Priority: Major
>
> This is a followup of the discussion in 
> https://github.com/apache/kafka/pull/13043#discussion_r1063071578
> offsetDelta in BatchMetadata is an int. Becasue of which, ProducerAppendInfo 
> may set a value that can overflow. Ideally, this data type should be long 
> instead of int. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)