Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-05 Thread via GitHub


showuon commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1980240072

   Ah, you're right! @iit2009060 , I missed that! Will you open another PR to 
fix it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-05 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1513877668


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -84,6 +85,9 @@ private enum SubscriptionType {
 /* the pattern user has requested */
 private Pattern subscribedPattern;
 
+/* RE2J compatible regex */
+private SubscriptionPattern subscriptionPattern;

Review Comment:
   I think it is a bit overkill to have an abstraction like this, the Pattern 
and SubscriptionPattern are already abstractions for the underneath regex 
string. What do you guys think @lianetm @dajac?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]

2024-03-05 Thread via GitHub


Phuc-Hong-Tran commented on code in PR #15188:
URL: https://github.com/apache/kafka/pull/15188#discussion_r1513877668


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##
@@ -84,6 +85,9 @@ private enum SubscriptionType {
 /* the pattern user has requested */
 private Pattern subscribedPattern;
 
+/* RE2J compatible regex */
+private SubscriptionPattern subscriptionPattern;

Review Comment:
   I think it is a bit overkill to have an abstraction like this. What do you 
guys think @lianetm @dajac?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16152: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart [kafka]

2024-03-05 Thread via GitHub


dajac commented on code in PR #15419:
URL: https://github.com/apache/kafka/pull/15419#discussion_r1513841644


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -546,8 +546,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() 
{
 data.setMemberEpoch(membershipManager.memberEpoch());
 
 // InstanceId - only sent if has changed since the last heartbeat

Review Comment:
   Right. This part is clearly wrong in the doc. It cannot change within the 
lifetime of the consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-05 Thread via GitHub


iit2009060 commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1980040479

   @showuon @chiacyu   This  has not fix the overall problem. It just moves the 
NullPointerException in the RemoteLogManager instead of ProducerStateManager. 
   
https://github.com/iit2009060/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L744
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP]KAFKA-15444: Native docker image [kafka]

2024-03-05 Thread via GitHub


github-actions[bot] commented on PR #14556:
URL: https://github.com/apache/kafka/pull/14556#issuecomment-1980016226

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]

2024-03-05 Thread via GitHub


bachmanity1 commented on PR #15475:
URL: https://github.com/apache/kafka/pull/15475#issuecomment-1980014397

   Hi @kirktrue, thanks for the review! I've created a new KIP here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Upgrade zookeeper 3.8.3 -> 3.8.4 [kafka]

2024-03-05 Thread via GitHub


KevinZTW opened a new pull request, #15480:
URL: https://github.com/apache/kafka/pull/15480

   upgrade Zookeeper from to 3.8.3 -> 3.8.4
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16346) Fix flay MetricsTest.testMetrics

2024-03-05 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823831#comment-17823831
 ] 

Chia-Ping Tsai commented on KAFKA-16346:


The count is increased even though the value is zero, so using the count is 
meaningless currently. Maybe we should update `messageConversionsTimeHist` only 
if the conversion does happen.

> Fix flay MetricsTest.testMetrics
> 
>
> Key: KAFKA-16346
> URL: https://issues.apache.org/jira/browse/KAFKA-16346
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> {code}
> Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > 
> testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false 
> FAILED
> org.opentest4j.AssertionFailedError: Broker metric not recorded correctly 
> for 
> kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce
>  value 0.0 ==> expected:  but was: 
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> at 
> app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314)
> at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110)
> {code}
> The value used to update metrics is calculated by Math.round, so it could be 
> zero if you have a good machine :)
> We should verify the `count`  instead of `value`, since it is convincible and 
> more stable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16346) Fix flay MetricsTest.testMetrics

2024-03-05 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang reassigned KAFKA-16346:
-

Assignee: PoAn Yang

> Fix flay MetricsTest.testMetrics
> 
>
> Key: KAFKA-16346
> URL: https://issues.apache.org/jira/browse/KAFKA-16346
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> {code}
> Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > 
> testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false 
> FAILED
> org.opentest4j.AssertionFailedError: Broker metric not recorded correctly 
> for 
> kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce
>  value 0.0 ==> expected:  but was: 
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at 
> app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
> at 
> app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
> at 
> app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
> at 
> app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314)
> at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110)
> {code}
> The value used to update metrics is calculated by Math.round, so it could be 
> zero if you have a good machine :)
> We should verify the `count`  instead of `value`, since it is convincible and 
> more stable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16346) Fix flay MetricsTest.testMetrics

2024-03-05 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16346:
--

 Summary: Fix flay MetricsTest.testMetrics
 Key: KAFKA-16346
 URL: https://issues.apache.org/jira/browse/KAFKA-16346
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai


{code}
Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > 
testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false FAILED
org.opentest4j.AssertionFailedError: Broker metric not recorded correctly 
for 
kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce 
value 0.0 ==> expected:  but was: 
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
at 
app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314)
at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110)
{code}

The value used to update metrics is calculated by Math.round, so it could be 
zero if you have a good machine :)

We should verify the `count`  instead of `value`, since it is convincible and 
more stable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1513702636


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   Thanks for sharing. That is interesting. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


KevinZTW commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1513701081


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   Sorry took a while to check this. I think it is because usually in our 
project, the `StaticLoggerBinder` is provided by `slf4j-reload4j` and In the 
Gradle build file, the project `:connect:runtime` adds that as test runtime 
only. 
   
   Hence, if we specify the `classpath` as `main` runtime, the path where 
`slf4j-reload4j`'s jar file is located won't be included. Hence cause the SL4J 
change to use NOP logger implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]

2024-03-05 Thread via GitHub


kirktrue commented on code in PR #15475:
URL: https://github.com/apache/kafka/pull/15475#discussion_r1513681366


##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -192,6 +192,12 @@ public class SaslConfigs {
 + " be inspected for the standard OAuth \"iss\" claim and if this 
value is set, the broker will match it exactly against what is in the JWT's 
\"iss\" claim. If there is no"
 + " match, the broker will reject the JWT and authentication will 
fail.";
 
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = 
"sasl.oauthbearer.header.urlencode.enable";
+public static final boolean 
DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = false;
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE_DOC = 
"The (optional) setting to enable oauthbearer client to urlencode client_id and 
client_secret in the authorization header"
++ " in accordance with RFC6749, see 
https://datatracker.ietf.org/doc/html/rfc6749#section-2.3.1 for more detail. 
The default value is set to 'false' for backward compatibility";
+
+

Review Comment:
   Sorry, another nit about extra whitespace.
   
   ```suggestion
   
   ```



##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java:
##
@@ -346,10 +350,17 @@ static String parseAccessToken(String responseBody) 
throws IOException {
 return sanitizeString("the token endpoint response's access_token JSON 
attribute", accessTokenNode.textValue());
 }
 
-static String formatAuthorizationHeader(String clientId, String 
clientSecret) {
+static String formatAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode) throws

Review Comment:
   Oh, I see, this is a `static` method. I don't remember why that was 
necessary 🤷‍♂️ 



##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -192,6 +192,12 @@ public class SaslConfigs {
 + " be inspected for the standard OAuth \"iss\" claim and if this 
value is set, the broker will match it exactly against what is in the JWT's 
\"iss\" claim. If there is no"
 + " match, the broker will reject the JWT and authentication will 
fail.";
 
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = 
"sasl.oauthbearer.header.urlencode.enable";

Review Comment:
   Is there a KIP associated with this Jira ticket? Unfortunately, additions of 
configuration options required a KIP 😞 



##
clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java:
##
@@ -346,10 +350,17 @@ static String parseAccessToken(String responseBody) 
throws IOException {
 return sanitizeString("the token endpoint response's access_token JSON 
attribute", accessTokenNode.textValue());
 }
 
-static String formatAuthorizationHeader(String clientId, String 
clientSecret) {
+static String formatAuthorizationHeader(String clientId, String 
clientSecret, boolean urlencode) throws

Review Comment:
   Instead of passing in the `urlencode` parameter here, can we use the 
`urlencodeHeader` directly?



##
clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java:
##
@@ -192,6 +192,12 @@ public class SaslConfigs {
 + " be inspected for the standard OAuth \"iss\" claim and if this 
value is set, the broker will match it exactly against what is in the JWT's 
\"iss\" claim. If there is no"
 + " match, the broker will reject the JWT and authentication will 
fail.";
 
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = 
"sasl.oauthbearer.header.urlencode.enable";
+public static final boolean 
DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = false;
+public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE_DOC = 
"The (optional) setting to enable oauthbearer client to urlencode client_id and 
client_secret in the authorization header"

Review Comment:
   Nitpick:
   
   ```suggestion
   public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE_DOC 
= "The (optional) setting to enable the OAuth client to URL-encode the 
client_id and client_secret in the authorization header"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]

2024-03-05 Thread via GitHub


mjsax commented on PR #15189:
URL: https://github.com/apache/kafka/pull/15189#issuecomment-1979900538

   @florin-akermann -- I finally merged 
https://github.com/apache/kafka/pull/14426 -- can you rebase this PR and fixup 
tests so we can move forward with this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()

2024-03-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15417.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

> JoinWindow does not  seem to work properly with a KStream - KStream - 
> LeftJoin()
> 
>
> Key: KAFKA-15417
> URL: https://issues.apache.org/jira/browse/KAFKA-15417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0
>Reporter: Victor van den Hoven
>Assignee: Victor van den Hoven
>Priority: Major
> Fix For: 3.8.0
>
> Attachments: Afbeelding 1-1.png, Afbeelding 1.png, 
> SimpleStreamTopology.java, SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.4.0 :
> According to the javadoc of the Joinwindow:
> _There are three different window configuration supported:_
>  * _before = after = time-difference_
>  * _before = 0 and after = time-difference_
>  * _*before = time-difference and after = 0*_
>  
> However if I use a joinWindow with *before = time-difference and after = 0* 
> on a kstream-kstream-leftjoin the *after=0* part does not seem to work.
> When using _stream1.leftjoin(stream2, joinWindow)_ with 
> {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on 
> stream 1 that can not be joined with any messages on stream2 should be joined 
> with a null-record after the _joinWindow.after_ has ended and a new message 
> has arrived on stream1.
> It does not.
> Only if the new message arrives after the value of _joinWindow.before_ the 
> previous message will be joined with a null-record.
>  
> Attached you can find two files with a TopologyTestDriver Unit test to 
> reproduce.
> topology:   stream1.leftjoin( stream2, joiner, joinwindow)
> joinWindow has before=5000ms and after=0
> message1(key1) ->  stream1
> after 4000ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 4900ms message2(key2) -> stream1  ->  NO null-record join was made, but 
> the after period was expired.
> after 5000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
> after 6000ms message2(key2) -> stream1  ->  A null-record join was made,  
> before period was expired.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


mjsax commented on PR #14426:
URL: https://github.com/apache/kafka/pull/14426#issuecomment-1979898854

   Thanks for the fix! Merged to `trunk`.
   
   Really appreciate that you did push this through. Was more complicated than 
expected and took way to long to get finished.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


mjsax merged PR #14426:
URL: https://github.com/apache/kafka/pull/14426


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-05 Thread via GitHub


showuon commented on PR #15444:
URL: https://github.com/apache/kafka/pull/15444#issuecomment-1979894426

   Thanks for the fix!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8

2024-03-05 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16209.
---
Fix Version/s: 3.8.0
   3.7.1
   Resolution: Fixed

> fetchSnapshot might return null if topic is created before v2.8
> ---
>
> Key: KAFKA-16209
> URL: https://issues.apache.org/jira/browse/KAFKA-16209
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.1
>Reporter: Luke Chen
>Assignee: Arpit Goyal
>Priority: Major
>  Labels: newbie, newbie++
> Fix For: 3.8.0, 3.7.1
>
>
> Remote log manager will fetch snapshot via ProducerStateManager 
> [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608],
>  but the snapshot map might get nothing if the topic has no snapshot created, 
> ex: topics before v2.8. Need to fix it to avoid NPE.
> old PR: https://github.com/apache/kafka/pull/14615/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]

2024-03-05 Thread via GitHub


showuon merged PR #15444:
URL: https://github.com/apache/kafka/pull/15444


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker

2024-03-05 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823817#comment-17823817
 ] 

Greg Harris commented on KAFKA-16344:
-

Hi [~janardhanag], thanks for the ticket.

At the current time, the offset syncs topic cannot have more than 1 partition. 
If more than one partition is present, the MirrorSourceTask will only write to 
partition 0, and the MirrorCheckpointTask will only read from partition 0. 
Changes to both of these would be necessary to support partitioning that topic, 
and would require a KIP.

For a workaround, are you able to increase your `offset.lag.max` from the 
default 100? This will make the offset translation less accurate, but can 
decrease the throughput on that topic.

Do you have some more statistics that confirm that this topic is too active? It 
should have just a fraction of the load of the other topics, so i'd be 
interested to know more about the scale you're working at.

Thanks,
Greg

> Internal topic mm2-offset-syncsinternal created with single 
> partition is putting more load on the broker
> -
>
> Key: KAFKA-16344
> URL: https://issues.apache.org/jira/browse/KAFKA-16344
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1
>Reporter: Janardhana Gopalachar
>Priority: Major
>
> We are using Kafka 3.5.1 version, we see that the internal topic created by 
> mirrormaker 
> mm2-offset-syncsinternal is created with single partition due to 
> which the CPU load on the broker which will be leader for this partition is 
> increased compared to other brokers. Can multiple partitions be  created for 
> the topic so that the CPU load would get distributed 
>  
> Topic: mm2-offset-syncscluster-ainternal    TopicId: XRvTDbogT8ytNhqX2YTyrA   
>  PartitionCount: 1ReplicationFactor: 3    Configs: 
> min.insync.replicas=2,cleanup.policy=compact,message.format.version=3.0-IV1
>     Topic: mm2-offset-syncscluster-ainternal    Partition: 0    Leader: 2    
> Replicas: 2,1,0    Isr: 2,1,0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1513659746


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   I'm not sure what _weird test_ you mean here. I've refactored it by using 
`serverProperties` to provide custom server properties for specific tests, if 
that's what you meant _weird_. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   Right now I don't think this config is useful because we are not doing the 
client-side pagination. The config only makes sense if one batch of partitions 
is large enough to cause client-side OOM.
   Maybe we should add this config in the future? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513646133


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   The DescribeTopicParitions does not provide the node info as Metadata Api 
does. However the TopicPartitionInfo constructor requires the node info, but 
the node info is useless in the describeTopic scenario.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -537,6 +544,18 @@ public Map 
listAllReassignments(Set

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   The client can only know if the server-side limit is greater when the result 
is received. 
   Actually, I think this config is only useful for testing. I am not sure 
whether any user will bother to change this config. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641333


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) {
 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
 excludeInternalTopicOpt = parser.accepts("exclude-internal",
 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",

Review Comment:
   user-describe-topics-api is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641009


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Do you mean the server returns an invalid cursor or the client sends one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640718


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) {
 return new HashMap<>(topicFutures);
 }
 
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640577


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.
+public DescribeTopicsOptions partitionSizeLimitPerResponse(int 
partitionSizeLimitPerResponse) {
+this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
+return this;
+}
+
 public boolean includeAuthorizedOperations() {
 return includeAuthorizedOperations;
 }
 
+public boolean useDescribeTopicPartitionsApi() {

Review Comment:
   removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640434


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Removed useDescribeTopicPartitionsApi. It was only used in the 
UnsupportedVersionException retry, but now I figured out a way to retry in the 
Call framework.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused controlPlaneRequestProcessor in BrokerServer. [kafka]

2024-03-05 Thread via GitHub


appchemist commented on PR #15245:
URL: https://github.com/apache/kafka/pull/15245#issuecomment-1979765706

   @chia7712 sorry, I checked it late.
   
   I run the failed tests on my local too
   ```./gradlew cleanTest connect:mirror:test --tests 
MirrorConnectorsIntegrationExactlyOnceTest core:test --tests 
PlaintextAdminIntegrationTest --tests SaslPlainPlaintextConsumerTest --tests 
ZkMigrationIntegrationTest --tests SaslSslConsumerTest --tests 
SaslPlaintextConsumerTest --tests FetchRequestTestDowngrade --tests 
ProduceRequestTest --tests DynamicBrokerReconfigurationTest tools:test --tests 
MetadataQuorumCommandTest --tests TopicCommandIntegrationTest storage:test 
--tests TransactionsWithTieredStoreTest server:test --tests 
ClientMetricsManagerTest metadata:test --tests QuorumControllerTest```
   All pass too.
   Thank you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add read/write all operation [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15462:
URL: https://github.com/apache/kafka/pull/15462#discussion_r1513587296


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -498,29 +497,17 @@ public CompletableFuture 
listGroups(
 );
 }
 
-final Set existingPartitionSet = runtime.partitions();
-
-if (existingPartitionSet.isEmpty()) {
-return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
-}
-
-final 
List>> futures =
-new ArrayList<>();
-
-for (TopicPartition tp : existingPartitionSet) {
-futures.add(runtime.scheduleReadOperation(
-"list-groups",
-tp,
-(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), request.typesFilter(), 
lastCommittedOffset)
-).exceptionally(exception -> {
-exception = Errors.maybeUnwrapException(exception);
-if (exception instanceof NotCoordinatorException) {
-return Collections.emptyList();
-} else {
-throw new CompletionException(exception);
-}
-}));
-}
+final 
List>> futures = 
runtime.scheduleReadAllOperation(
+"list-groups",
+(coordinator, lastCommittedOffset) -> 
coordinator.listGroups(request.statesFilter(), request.typesFilter(), 
lastCommittedOffset)
+).stream().map(future -> future.exceptionally(exception -> {

Review Comment:
   how about adding the exception function to `scheduleReadAllOperation`? It 
brings two benefits:
   
   1. simplify the code from callers 
   2. avoid creating list repeatedly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-05 Thread via GitHub


kirktrue commented on code in PR #15479:
URL: https://github.com/apache/kafka/pull/15479#discussion_r1513571996


##
clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java:
##
@@ -79,15 +79,15 @@ public static SimpleAdminApiFuture newFuture(
 @Override
 public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, 
Set keys) {
 Map 
deletionsForTopic = new HashMap<>();
-for (Map.Entry entry: 
recordsToDelete.entrySet()) {
-TopicPartition topicPartition = entry.getKey();
+for (TopicPartition topicPartition : keys) {
+RecordsToDelete toDelete = recordsToDelete.get(topicPartition);
 DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = 
deletionsForTopic.computeIfAbsent(
 topicPartition.topic(),
 key -> new 
DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic())
 );
 deleteRecords.partitions().add(new 
DeleteRecordsRequestData.DeleteRecordsPartition()
 .setPartitionIndex(topicPartition.partition())
-.setOffset(entry.getValue().beforeOffset()));
+.setOffset(toDelete.beforeOffset()));

Review Comment:
   Would it be unduly paranoid to check for `null` before accessing `toDelete`? 
The call stack up to this point is pretty twisty. I'm assuming that `keys` 
_shouldn't_ contain any `TopicPartition`s that aren't keys in 
`recordsToDelete`, but... 🤷‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2024-03-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15402:
--
Labels: consumer-threading-refactor  (was: )

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
>  Labels: consumer-threading-refactor
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2024-03-05 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15402:
--
Component/s: clients

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Benoit Delbosc
>Priority: Major
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16100: Add timeout to all the CompletableApplicationEvents [kafka]

2024-03-05 Thread via GitHub


kirktrue commented on code in PR #15455:
URL: https://github.com/apache/kafka/pull/15455#discussion_r1513515432


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java:
##
@@ -29,10 +30,19 @@ public abstract class CommitEvent extends 
CompletableApplicationEvent {
  */
 private final Map offsets;
 
-protected CommitEvent(final Type type, final Map offsets) {
-super(type);
+protected CommitEvent(final Type type, final Map offsets, final Timer timer) {
+super(type, timer);
 this.offsets = Collections.unmodifiableMap(offsets);
+validate(this.offsets);
+}
+
+protected CommitEvent(final Type type, final Map offsets, final long timer) {

Review Comment:
   Thanks for catching that, @cadonna! I renamed it to `deadlineMs` for 
consistency with the rest of the variable/parameter names.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on PR #15365:
URL: https://github.com/apache/kafka/pull/15365#issuecomment-1979651000

   @nizhikov I feel this PR is ready, and so please check (or list) the failed 
tests. If they are unconnected to this PR, I will merge it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused controlPlaneRequestProcessor in BrokerServer. [kafka]

2024-03-05 Thread via GitHub


chia7712 merged PR #15245:
URL: https://github.com/apache/kafka/pull/15245


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused controlPlaneRequestProcessor in BrokerServer. [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on PR #15245:
URL: https://github.com/apache/kafka/pull/15245#issuecomment-1979648508

   run the failed tests on my local:
   ```sh
   ./gradlew cleanTest core:test --tests FetchRequestTestDowngrade --tests 
ProduceRequestTest --tests DynamicBrokerReconfigurationTest tools:test --tests 
MetadataQuorumCommandTest
   ```
   all pass. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


kirktrue commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513390603


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   Is there a warning logged in the case where the client sends a limit greater 
than what's allowed on the broker?



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -537,6 +544,18 @@ public Map 
listAllReassignments(Set(topicFutures);
 }
 
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   AFAICT, the `callName` used within the `Call` object is only used for 
logging. That said, there's no point in confusing the user who's looking 
through the logs.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   Just for my own understanding, why do we favor creating _fake_ nodes instead 
of looking up the _real_ nodes from the metadata or something?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Sorry for being daft, but when would the user know to set this one way or 
the other. Is this something that can be handled under the covers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove test constructor for PartitionAssignment [kafka]

2024-03-05 Thread via GitHub


cmccabe merged PR #15435:
URL: https://github.com/apache/kafka/pull/15435


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers

2024-03-05 Thread AlexeyASF (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823757#comment-17823757
 ] 

AlexeyASF commented on KAFKA-16319:
---

Great news, thank you very much for quick reaction! (y)

> Wrong broker destinations for DeleteRecords requests when more than one topic 
> is involved and the topics/partitions are led by different brokers
> 
>
> Key: KAFKA-16319
> URL: https://issues.apache.org/jira/browse/KAFKA-16319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: AlexeyASF
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.8.0
>
>
> h2. Context
> Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
> via 
> {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
>  method. Such requests may involve more than 1 topic (or partition), and such 
> requests are supposed to be sent to partitions' leaders brokers.
>  
> h2. Observed behaviour
> In case when {{DeleteRecords}} request includes more than one topic (let's 
> say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
> brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
> sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
> not_leader_or_follower errors. As not the whole request was successful 
> ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
> _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the 
> response will be partially faulty again and again. It also may (and does) 
> happen that there is a “mirrored” half-faulty request - in this case, to 
> {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} 
> operation fails.
> Here’s an anonymised logs example from a production system (“direct” and 
> “mirrored” requests, one after another):
> {code:java}
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
> ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
> correlationId=42003907, timeoutMs=3
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>   ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
> correlationId=42003906, timeoutMs=3 {code}
> Such request results in the following response (in this case, only for the 
> "direct" response):
> {code:java}
> [AdminClient clientId=worker-admin]
> Call(
>   callName=deleteRecords(api=DELETE_RECORDS),
>   deadlineMs=...,
>   tries=..., // Can be hundreds
>   nextAllowedTryMs=...)
> got response DeleteRecordsResponseData(
>   throttleTimeMs=0,
>   topics=[
> DeleteRecordsTopicResult(
>   name='topic2',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
> errorCode 6, which is not_leader_or_follower
> DeleteRecordsTopicResult(
>   name='topic1',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
> errorCode 0, which means the operation was successful
>   ]
> ) {code}
> h2. Expected behaviour
> {{DeleteRecords}} requests are sent to corresponding partitions' leaders 
> brokers when more than 1 topic/partition is involved and they are led by 
> different brokers.
> h2. Notes
>  * {_}presumably{_}, introduced in 3.6.1 via 
> [https://github.com/apache/kafka/pull/13760] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers

2024-03-05 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823755#comment-17823755
 ] 

Andrew Schofield commented on KAFKA-16319:
--

No worries. My initial assessment was incorrect. The code was broken in 3.6.0 
and was still broken in trunk. I've submitted a fix.

Essentially, every broker was being sent every topic-partition, even ones that 
it didn't lead. So, the kafka-delete-records.sh tool was working because 
overall the KafkaAdmin.deleteRecords request was working, but the bad error 
codes were being masked/ignored.

> Wrong broker destinations for DeleteRecords requests when more than one topic 
> is involved and the topics/partitions are led by different brokers
> 
>
> Key: KAFKA-16319
> URL: https://issues.apache.org/jira/browse/KAFKA-16319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0, 3.7.0, 3.6.1
>Reporter: AlexeyASF
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.8.0
>
>
> h2. Context
> Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
> via 
> {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
>  method. Such requests may involve more than 1 topic (or partition), and such 
> requests are supposed to be sent to partitions' leaders brokers.
>  
> h2. Observed behaviour
> In case when {{DeleteRecords}} request includes more than one topic (let's 
> say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
> brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
> sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
> not_leader_or_follower errors. As not the whole request was successful 
> ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
> _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the 
> response will be partially faulty again and again. It also may (and does) 
> happen that there is a “mirrored” half-faulty request - in this case, to 
> {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} 
> operation fails.
> Here’s an anonymised logs example from a production system (“direct” and 
> “mirrored” requests, one after another):
> {code:java}
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
> ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
> correlationId=42003907, timeoutMs=3
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>   ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
> correlationId=42003906, timeoutMs=3 {code}
> Such request results in the following response (in this case, only for the 
> "direct" response):
> {code:java}
> [AdminClient clientId=worker-admin]
> Call(
>   callName=deleteRecords(api=DELETE_RECORDS),
>   deadlineMs=...,
>   tries=..., // Can be hundreds
>   nextAllowedTryMs=...)
> got response DeleteRecordsResponseData(
>   throttleTimeMs=0,
>   topics=[
> DeleteRecordsTopicResult(
>   name='topic2',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
> errorCode 6, which is not_leader_or_follower
> DeleteRecordsTopicResult(
>   name='topic1',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
> errorCode 0, which means the operation was successful
>   ]
> ) {code}
> h2. Expected behaviour
> {{DeleteRecords}} requests are sent to corresponding partitions' leaders 
> brokers when more than 1 topic/partition is involved and they are led by 
> different brokers.
> h2. Notes
>  * {_}presumably{_}, introduced in 3.6.1 via 
> [https://github.com/apache/kafka/pull/13760] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]

2024-03-05 Thread via GitHub


AndrewJSchofield opened a new pull request, #15479:
URL: https://github.com/apache/kafka/pull/15479

   PR https://github.com/apache/kafka/pull/13760 introduced a problem with 
KafkaAdmin.deleteRecords. If the request acted on a set of topic-partitions 
which spanned multiple leader brokers, the request for all of the 
topic-partitions were sent to all brokers. While this technically worked, it 
did mean that every broker handled its own topic-partitions and failed all of 
the ones that it didn't lead. This meant that every topic-partition was acted 
on, but at the cost of a lot of failed subrequests.
   
   The code was not paying attention to the mapping from node to 
topic-partition passed into `DeleteRecordsHandler.buildBatchedRequest`. This PR 
filters the subrequests for each node based on the mapping passed into that 
method.
   
   One of the existing unit tests actually codified the wrong behavior, so that 
has been fixed. Then a more complicated unit test that generates a mapping and 
then checks that it is correctly filtered has been added.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR; Log reason for deleting a kraft snapshot [kafka]

2024-03-05 Thread via GitHub


jsancio opened a new pull request, #15478:
URL: https://github.com/apache/kafka/pull/15478

   There are three reasons why KRaft would delete a snapshot. One, it is older 
than the retention time. Two, the total number of bytes between the log and the 
snapshot excess the configuration. Three, the latest snapshot is newer than the 
log.
   
   This change allows KRaft to log the exact reason why a snapshot is getting 
deleted.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


dajac commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Shouldn't the selection be automatic? I don't think users will bother about 
this. Basically, the new API should be used when available and the old one when 
not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


dajac commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Shouldn't the selection be automatic? I don't think uses will bother about 
this. Basically, the new API should be used when available and the old one when 
not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513310569


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) {
 return new HashMap<>(topicFutures);
 }
 
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   Should this be "describeTopicPartitions"?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   I think you need a small change to KIP-966 to document these changes to the 
admin API.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) {
 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
 excludeInternalTopicOpt = parser.accepts("exclude-internal",
 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",

Review Comment:
   In other cases where a new API has been introduced, I think the principle 
followed is to try the new one without an option, and falling back if it is 
detected that it's required. That would be much nicer than expecting the 
innocent user from understanding what `user-describe-topics-api` means.



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Could we have a test with an invalid cursor?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.
+public DescribeTopicsOptions partitionSizeLimitPerResponse(int 
partitionSizeLimitPerResponse) {
+this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
+return this;
+}
+
 public boolean includeAuthorizedOperations() {
 return includeAuthorizedOperations;
 }
 
+public boolean useDescribeTopicPartitionsApi() {

Review Comment:
   I suggest just `useDescribeTopicPartitions()`. In the Javadoc, you can 
mention that it's using the DescribeTopicPartitions API under the covers. Most 
users of the Kafka admin client would consider `KafkaAdminClient` to be the 
API, rather than the Kafka protocol which is what is meant here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries a

Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]

2024-03-05 Thread via GitHub


clolov commented on PR #15261:
URL: https://github.com/apache/kafka/pull/15261#issuecomment-1979391796

   Thanks for the review @cadonna! I will provide an updated version tomorrow 
morning!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: remove test constructor for PartitionAssignment [kafka]

2024-03-05 Thread via GitHub


cmccabe commented on code in PR #15435:
URL: https://github.com/apache/kafka/pull/15435#discussion_r1513283747


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -792,6 +792,43 @@ class KRaftClusterTest {
 }
   }
 
+  /**
+   * Test that setting the Confluent-specific configuration
+   * confluent.apply.create.topic.policy.to.create.partitions has the expected 
effect.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("3.7-IV0", "3.7-IV2"))
+  def testCreatePartitions(metadataVersionString: String): Unit = {

Review Comment:
   This test passes before and after the change. I just added it because I 
noticed a test gap in this area.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]

2024-03-05 Thread via GitHub


divijvaidya commented on code in PR #15472:
URL: https://github.com/apache/kafka/pull/15472#discussion_r1513268220


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws 
InterruptedException
 this.cancel();
 } catch (InterruptedException | RetriableException ex) {
 throw ex;
+} catch (CorruptIndexException ex) {
+logger.error("Error occurred while copying log segments. Index 
appeared to be corrupted for partition: {}  ", topicIdPartition, ex);

Review Comment:
   I am assuming that the way this error will be monitored is by creating an 
alarm on `RemoteCopyLagSegments` [1]. Is that right?
   
   Can you also please why shouldn't we increment the 
`failedRemoteCopyRequestRate` and `failedRemoteCopyRequestRate` metric that are 
being incremented in catch exception below?
   
   
   
   [1] https://kafka.apache.org/documentation.html#tiered_storage_monitoring




##
storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java:
##
@@ -75,13 +75,14 @@ public void sanityCheck() {
 TimestampOffset entry = lastEntry();
 long lastTimestamp = entry.timestamp;
 long lastOffset = entry.offset;
-if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))
-throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
-+ "non-zero size but the last timestamp is " + lastTimestamp + 
" which is less than the first timestamp "
-+ timestamp(mmap(), 0));
+
 if (entries() != 0 && lastOffset < baseOffset())
 throw new CorruptIndexException("Corrupt time index found, time 
index file (" + file().getAbsolutePath() + ") has "
 + "non-zero size but the last offset is " + lastOffset + " 
which is less than the first offset " + baseOffset());
+if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0))

Review Comment:
   I am assuming that the reason of moving this down is to use the less 
expensive validation first?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [No Review] KAFKA-14563 part 1 [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent opened a new pull request, #15477:
URL: https://github.com/apache/kafka/pull/15477

   Draft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers

2024-03-05 Thread AlexeyASF (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823723#comment-17823723
 ] 

AlexeyASF commented on KAFKA-16319:
---

??How do you get it to do that? Do you have a small test program and 
instructions for setting up the cluster to make it happen? I'm sure I can fix 
it if only I can make it fail :) For one thing, I would add a test that fails 
without the fix and then succeeds when the code has been fixed so having a 
pointer would be helpful.??

That's all a great idea, but, unfortunately, no, i haven't yet created a 
minimalistic test setup.

??I have reproduced it. Certainly fails like this on 3.6.0.??

[~schofielaj] Did you have a chance already to add a minimalistic test for 
this? If you did / you are on it already, i'd like to avoid duplicated work, 
but if you're haven't started yet - i can give it a shot 2-3 hours later.

> Wrong broker destinations for DeleteRecords requests when more than one topic 
> is involved and the topics/partitions are led by different brokers
> 
>
> Key: KAFKA-16319
> URL: https://issues.apache.org/jira/browse/KAFKA-16319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: AlexeyASF
>Assignee: Andrew Schofield
>Priority: Major
>
> h2. Context
> Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
> via 
> {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
>  method. Such requests may involve more than 1 topic (or partition), and such 
> requests are supposed to be sent to partitions' leaders brokers.
>  
> h2. Observed behaviour
> In case when {{DeleteRecords}} request includes more than one topic (let's 
> say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
> brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
> sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
> not_leader_or_follower errors. As not the whole request was successful 
> ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
> _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the 
> response will be partially faulty again and again. It also may (and does) 
> happen that there is a “mirrored” half-faulty request - in this case, to 
> {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} 
> operation fails.
> Here’s an anonymised logs example from a production system (“direct” and 
> “mirrored” requests, one after another):
> {code:java}
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
> ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
> correlationId=42003907, timeoutMs=3
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>   ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
> correlationId=42003906, timeoutMs=3 {code}
> Such request results in the following response (in this case, only for the 
> "direct" response):
> {code:java}
> [AdminClient clientId=worker-admin]
> Call(
>   callName=deleteRecords(api=DELETE_RECORDS),
>   deadlineMs=...,
>   tries=..., // Can be hundreds
>   nextAllowedTryMs=...)
> got response DeleteRecordsResponseData(
>   throttleTimeMs=0,
>   topics=[
> DeleteRecordsTopicResult(
>   name='topic2',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
> errorCode 6, which is not_leader_or_follower
> DeleteRecordsTopicResult(
>   name='topic1',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
> errorCode 0, which means the operation was successful
>   ]
> ) {code}
> h2. Expected behaviour
> {{DeleteRecords}} requests are sent to corresponding partitions' leaders 
> brokers when more than 1 topic/partition is involved and they are led by 
> different brokers.
> h2. Notes
>  * {_}presumably{_}, introduced in 3.6.1 via 
> [https://github.com/apache/kafka/pull/

[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers

2024-03-05 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823706#comment-17823706
 ] 

Andrew Schofield commented on KAFKA-16319:
--

I have reproduced it. Certainly fails like this on 3.6.0.

> Wrong broker destinations for DeleteRecords requests when more than one topic 
> is involved and the topics/partitions are led by different brokers
> 
>
> Key: KAFKA-16319
> URL: https://issues.apache.org/jira/browse/KAFKA-16319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: AlexeyASF
>Assignee: Andrew Schofield
>Priority: Major
>
> h2. Context
> Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
> via 
> {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
>  method. Such requests may involve more than 1 topic (or partition), and such 
> requests are supposed to be sent to partitions' leaders brokers.
>  
> h2. Observed behaviour
> In case when {{DeleteRecords}} request includes more than one topic (let's 
> say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
> brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
> sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
> not_leader_or_follower errors. As not the whole request was successful 
> ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
> _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the 
> response will be partially faulty again and again. It also may (and does) 
> happen that there is a “mirrored” half-faulty request - in this case, to 
> {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} 
> operation fails.
> Here’s an anonymised logs example from a production system (“direct” and 
> “mirrored” requests, one after another):
> {code:java}
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
> ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
> correlationId=42003907, timeoutMs=3
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>   ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
> correlationId=42003906, timeoutMs=3 {code}
> Such request results in the following response (in this case, only for the 
> "direct" response):
> {code:java}
> [AdminClient clientId=worker-admin]
> Call(
>   callName=deleteRecords(api=DELETE_RECORDS),
>   deadlineMs=...,
>   tries=..., // Can be hundreds
>   nextAllowedTryMs=...)
> got response DeleteRecordsResponseData(
>   throttleTimeMs=0,
>   topics=[
> DeleteRecordsTopicResult(
>   name='topic2',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
> errorCode 6, which is not_leader_or_follower
> DeleteRecordsTopicResult(
>   name='topic1',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
> errorCode 0, which means the operation was successful
>   ]
> ) {code}
> h2. Expected behaviour
> {{DeleteRecords}} requests are sent to corresponding partitions' leaders 
> brokers when more than 1 topic/partition is involved and they are led by 
> different brokers.
> h2. Notes
>  * {_}presumably{_}, introduced in 3.6.1 via 
> [https://github.com/apache/kafka/pull/13760] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16322 : upgrade jline [kafka]

2024-03-05 Thread via GitHub


johnnychhsu commented on PR #15464:
URL: https://github.com/apache/kafka/pull/15464#issuecomment-1979168636

   the Jenkins pipeline (jdk8, scala 2.12) failed due to `Unable to connect to 
the child process`. I tried the same command in locall, run `./gradlew 
-PscalaVersion=2.12 clean check -x test --profile --continue 
-PxmlSpotBugsReport=true -PkeepAliveMode=session` in local and it succeeded.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14048) The Next Generation of the Consumer Rebalance Protocol

2024-03-05 Thread Aratz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823698#comment-17823698
 ] 

Aratz commented on KAFKA-14048:
---

Where can I find the timeline for this work? Is there any? 

Thank you.

> The Next Generation of the Consumer Rebalance Protocol
> --
>
> Key: KAFKA-14048
> URL: https://issues.apache.org/jira/browse/KAFKA-14048
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> This Jira tracks the development of KIP-848: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add read/write all operation [kafka]

2024-03-05 Thread via GitHub


dajac commented on PR #15462:
URL: https://github.com/apache/kafka/pull/15462#issuecomment-1979123455

   @chia7712 Would you be interested in reviewing this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16099) Handle timeouts for AsyncKafkaConsumer.commitSync

2024-03-05 Thread Aratz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823686#comment-17823686
 ] 

Aratz commented on KAFKA-16099:
---

Where can we find the corresponding PR?

> Handle timeouts for AsyncKafkaConsumer.commitSync
> -
>
> Key: KAFKA-16099
> URL: https://issues.apache.org/jira/browse/KAFKA-16099
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients
>Reporter: Andrew Schofield
>Priority: Major
> Fix For: 3.7.0
>
>
> The handling of synchronous offset commits in the background thread does not 
> observe the caller's timeout. In the situation that a commit request needs to 
> be retried, the retries should not extend beyond the caller's timeout. The 
> CommitApplicationEvent should contain the timeout and not continue beyond 
> that time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration

2024-03-05 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823675#comment-17823675
 ] 

PoAn Yang commented on KAFKA-16222:
---

I can reproduce the error. I will look into code tomorrow. Thanks.

 
{noformat}
> docker run -it --rm --network mykafka-zookeeper_default bitnami/kafka:3.6 
> kafka-configs.sh --describe --all --entity-type users --bootstrap-server 
> kafka-0:9092
kafka 15:19:03.20 INFO  ==>
kafka 15:19:03.20 INFO  ==> Welcome to the Bitnami kafka container
kafka 15:19:03.20 INFO  ==> Subscribe to project updates by watching 
https://github.com/bitnami/containers
kafka 15:19:03.20 INFO  ==> Submit issues and feature requests at 
https://github.com/bitnami/containers/issues
kafka 15:19:03.20 INFO  ==>

Quota configs for user-principal 'myuser%40prod' are consumer_byte_rate=2048.0, 
request_percentage=200.0, producer_byte_rate=1024.0{noformat}

> KRaft Migration: Incorrect default user-principal quota after migration
> ---
>
> Key: KAFKA-16222
> URL: https://issues.apache.org/jira/browse/KAFKA-16222
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft, migration
>Affects Versions: 3.7.0, 3.6.1
>Reporter: Dominik
>Assignee: PoAn Yang
>Priority: Blocker
>
> We observed that our default user quota seems not to be migrated correctly.
> Before Migration:
> bin/kafka-configs.sh --describe --all --entity-type users
> Quota configs for the *default user-principal* are 
> consumer_byte_rate=100.0, producer_byte_rate=100.0
> Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} 
> are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8
> After Migration:
> bin/kafka-configs.sh --describe --all --entity-type users
> Quota configs for *user-principal ''* are consumer_byte_rate=100.0, 
> producer_byte_rate=100.0
> Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} 
> are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8
>  
> Additional finding: Our names contains a "@" which also lead to incorrect 
> after migration state.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


ijuma commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1513013595


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -52,20 +55,30 @@ public class GetOffsetShellTest {
 private final int topicCount = 4;
 private final int offsetTopicPartitionCount = 4;
 private final ClusterInstance cluster;
+private final String topicName = "topic";
 
 public GetOffsetShellTest(ClusterInstance cluster) {
 this.cluster = cluster;
 }
 
 private String getTopicName(int i) {
-return "topic" + i;
+return topicName + i;
 }
 
-public void setUp() {
+@BeforeEach
+public void before(TestInfo testInfo) {

Review Comment:
   Hmm, this is a weird test to verify this behavior. @hachikuji @jolshan had 
identified a test that seemed to be a better candidate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]

2024-03-05 Thread via GitHub


pprovenzano commented on code in PR #15409:
URL: https://github.com/apache/kafka/pull/15409#discussion_r1513010665


##
tests/kafkatest/tests/core/log_dir_failure_test.py:
##
@@ -84,20 +84,25 @@ def __init__(self, test_context):
 self.num_consumers = 1
 
 def setUp(self):
-self.zk.start()
+if self.zk:
+self.zk.start()
 
 def min_cluster_size(self):
 """Override this since we're adding services outside of the 
constructor"""
 return super(LogDirFailureTest, self).min_cluster_size() + 
self.num_producers * 2 + self.num_consumers * 2
 
-@cluster(num_nodes=9)
-@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], 
security_protocol=["PLAINTEXT"])
-def test_replication_with_disk_failure(self, bounce_broker, 
security_protocol, broker_type):
+@cluster(num_nodes=8)
+@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], 
security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk])
+@cluster(num_nodes=7)
+@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], 
security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.combined_kraft])

Review Comment:
   I agree with dropping the combined test here. I'm not sure I would ever 
suggest running JBOD in combined mode. A separate process for broker and 
controller on the same node would be better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512985080


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -884,11 +886,13 @@ public void 
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
 
 processor.checkAndClearProcessResult();
 
-// push one item to the first stream; this should produce one 
full-join item
+// push one item to the first stream;
+// this should produce one inner-join item;
+// and a right-joined item for a3

Review Comment:
   Removed the line in the comment that says we produce output for a3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512983679


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##
@@ -438,13 +438,13 @@ public void testOrdering() {
 inputTopic1.pipeInput(1, "A1", 100L);
 processor.checkAndClearProcessResult();
 
-// push one item to the other window that has a join; this should 
produce non-joined records with a closed window first, then
-// the joined records
-// by the time they were produced before
+// push one item to the other window that has a join;
+// this should produce the joined record first;
+// then the not-joined record

Review Comment:
   Modified the comments according to the results produced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512982523


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {

Review Comment:
   Correct, removed this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512981646


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items

Review Comment:
   modified comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2024-03-05 Thread via GitHub


VictorvandenHoven commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1512980857


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##
@@ -436,6 +436,239 @@ public void 
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
 }
 }
 
+@Test
+public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+final KStream stream1;
+final KStream stream2;
+final KStream joined;
+final MockApiProcessorSupplier supplier = 
new MockApiProcessorSupplier<>();
+stream1 = builder.stream(topic1, consumed);
+stream2 = builder.stream(topic2, consumed);
+
+joined = stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+StreamJoined.with(Serdes.Integer(),
+Serdes.String(),
+Serdes.String())
+);
+joined.process(supplier);
+
+final Collection> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+assertEquals(1, copartitionGroups.size());
+assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
+
+try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), PROPS)) {
+final TestInputTopic inputTopic1 =
+driver.createInputTopic(topic1, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final TestInputTopic inputTopic2 =
+driver.createInputTopic(topic2, new IntegerSerializer(), 
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+final MockApiProcessor processor = 
supplier.theCapturedProcessor();
+
+processor.init(null);
+// push four items with increasing timestamps to the primary 
stream; this should emit null-joined items
+// w1 = {}
+// w2 = {}
+// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 
3:B3 (ts: 1003) }

Review Comment:
   Changed B into A



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


KevinZTW commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512973940


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.log4j.LogManager;

Review Comment:
   thanks! I should double check about this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: parameterize group-id in GroupMetadataManagerTestContext [kafka]

2024-03-05 Thread via GitHub


chia7712 merged PR #15467:
URL: https://github.com/apache/kafka/pull/15467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers

2024-03-05 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823653#comment-17823653
 ] 

Andrew Schofield commented on KAFKA-16319:
--

[~alexeyasf] How do you get it to do that? Do you have a small test program and 
instructions for setting up the cluster to make it happen? I'm sure I can fix 
it if only I can make it fail :) For one thing, I would add a test that fails 
without the fix and then succeeds when the code has been fixed so having a 
pointer would be helpful.

> Wrong broker destinations for DeleteRecords requests when more than one topic 
> is involved and the topics/partitions are led by different brokers
> 
>
> Key: KAFKA-16319
> URL: https://issues.apache.org/jira/browse/KAFKA-16319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: AlexeyASF
>Assignee: Andrew Schofield
>Priority: Major
>
> h2. Context
> Kafka streams applications send, time after time, {{DeleteRecords}} requests, 
> via 
> {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}}
>  method. Such requests may involve more than 1 topic (or partition), and such 
> requests are supposed to be sent to partitions' leaders brokers.
>  
> h2. Observed behaviour
> In case when {{DeleteRecords}} request includes more than one topic (let's 
> say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different 
> brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is 
> sent to only one broker (let’s say {{{}broker1{}}}), leading to partial 
> not_leader_or_follower errors. As not the whole request was successful 
> ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the 
> _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the 
> response will be partially faulty again and again. It also may (and does) 
> happen that there is a “mirrored” half-faulty request - in this case, to 
> {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} 
> operation fails.
> Here’s an anonymised logs example from a production system (“direct” and 
> “mirrored” requests, one after another):
> {code:java}
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
> ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1
> correlationId=42003907, timeoutMs=3
> [AdminClient clientId=worker-admin]
> Sending DeleteRecordsRequestData(topics=[
>   DeleteRecordsTopic(
> name='topic1',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)]
>   ),
>   DeleteRecordsTopic(
> name='topic2',
> partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)]
> )], timeoutMs=6)
> to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2
> correlationId=42003906, timeoutMs=3 {code}
> Such request results in the following response (in this case, only for the 
> "direct" response):
> {code:java}
> [AdminClient clientId=worker-admin]
> Call(
>   callName=deleteRecords(api=DELETE_RECORDS),
>   deadlineMs=...,
>   tries=..., // Can be hundreds
>   nextAllowedTryMs=...)
> got response DeleteRecordsResponseData(
>   throttleTimeMs=0,
>   topics=[
> DeleteRecordsTopicResult(
>   name='topic2',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the 
> errorCode 6, which is not_leader_or_follower
> DeleteRecordsTopicResult(
>   name='topic1',
>   partitions=[DeleteRecordsPartitionResult(
> partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the 
> errorCode 0, which means the operation was successful
>   ]
> ) {code}
> h2. Expected behaviour
> {{DeleteRecords}} requests are sent to corresponding partitions' leaders 
> brokers when more than 1 topic/partition is involved and they are led by 
> different brokers.
> h2. Notes
>  * {_}presumably{_}, introduced in 3.6.1 via 
> [https://github.com/apache/kafka/pull/13760] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: parameterize group-id in GroupMetadataManagerTestContext [kafka]

2024-03-05 Thread via GitHub


dongnuo123 commented on code in PR #15467:
URL: https://github.com/apache/kafka/pull/15467#discussion_r1512942687


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -901,7 +901,7 @@ public RebalanceResult staticMembersJoinAndRebalance(
 public PendingMemberGroupResult setupGroupWithPendingMember(ClassicGroup 
group) throws Exception {
 // Add the first member
 JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
-.withGroupId("group-id")
+.withGroupId(group.groupId())

Review Comment:
   The group id is not a parameter of the method, so I guess it's fine to use 
`"group-id`



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -901,7 +901,7 @@ public RebalanceResult staticMembersJoinAndRebalance(
 public PendingMemberGroupResult setupGroupWithPendingMember(ClassicGroup 
group) throws Exception {
 // Add the first member
 JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
-.withGroupId("group-id")
+.withGroupId(group.groupId())

Review Comment:
   The group id is not a parameter of the method, so I guess it's fine to use 
`"group-id"`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512938465


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   > And I think it's because their classpath set to the 
sourceSets.main.runtimeClasspath so instead of close the logger directly I 
would leverage this to eliminate unexpected log info
   
   pardon me, why this works?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14679) Add new __consumer_offsets records

2024-03-05 Thread Aratz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823645#comment-17823645
 ] 

Aratz commented on KAFKA-14679:
---

Okay found it, I think it is this one: 
[https://github.com/apache/kafka/pull/13203|http://example.com]

> Add new __consumer_offsets records
> --
>
> Key: KAFKA-14679
> URL: https://issues.apache.org/jira/browse/KAFKA-14679
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14679) Add new __consumer_offsets records

2024-03-05 Thread Aratz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823644#comment-17823644
 ] 

Aratz commented on KAFKA-14679:
---

Hello,

Is there any Github PR related to this *resolved* work?

> Add new __consumer_offsets records
> --
>
> Key: KAFKA-14679
> URL: https://issues.apache.org/jira/browse/KAFKA-14679
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512899702


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -30,6 +30,7 @@
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.log4j.LogManager;

Review Comment:
   this change is unnecessary, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


KevinZTW commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512897988


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   After executing those tasks, I think they don't have similar issue and the 
logger implementation couldn't be loaded
   ```
   SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
   SLF4J: Defaulting to no-operation (NOP) logger implementation
   ```
   And I think it's because their classpath set to the 
`sourceSets.main.runtimeClasspath` so instead of close the logger directly I 
would leverage this to eliminate unexpected log info
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


KevinZTW commented on PR #15473:
URL: https://github.com/apache/kafka/pull/15473#issuecomment-1978869931

   > Could you run the website with the change and paste the screenshot? Thanks.
   
   Sure!
   
   ## Previous Version
   
![image](https://github.com/apache/kafka/assets/38662781/6b390918-0d42-4b54-85f2-083620d2398e)
   
![image](https://github.com/apache/kafka/assets/38662781/08f57b13-3fc5-461c-aa89-26ab6aab7afa)
   
![image](https://github.com/apache/kafka/assets/38662781/e97b9050-b9d0-4380-94f6-82ca5618d2d0)
   
   
   ## Now
   
   
![image](https://github.com/apache/kafka/assets/38662781/ec6e2ff9-fe26-4d36-afa1-493b35102c40)
   
![image](https://github.com/apache/kafka/assets/38662781/6f3f5348-9e14-4c75-a592-c409781718d3)
   
![image](https://github.com/apache/kafka/assets/38662781/dcf2ae81-9a97-4ac5-81da-ec736186f554)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compredded type [kafka]

2024-03-05 Thread via GitHub


johnnychhsu commented on PR #15476:
URL: https://github.com/apache/kafka/pull/15476#issuecomment-1978864287

   thanks for the quick review @chia7712 , sure let me address that 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16341: fix the LogValidator for non-compredded type [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on PR #15476:
URL: https://github.com/apache/kafka/pull/15476#issuecomment-1978858961

   The fix is perfect. Please rewrite the test according to #15474. Putting all 
test cases together is more readable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16341: fix the LogValidator for non-compredded type [kafka]

2024-03-05 Thread via GitHub


johnnychhsu opened a new pull request, #15476:
URL: https://github.com/apache/kafka/pull/15476

   ## Context
   Previously in the LogValidator, the `offsetOfMaxTimestamp` depends on two 
parameter check
   1. timestampType
   2. batch.toMagic
   If the `timestampType` is `LOG_APPEND_TIME`, and the `toMagic` is larger or 
equals to `MAGIC_VALUE_V2`, then we assign `offsetCounter.value - 1` to it, 
otherwise keep the initial value.
   
   However, in 
[KAFKA-14477](https://issues.apache.org/jira/browse/KAFKA-14477), this was 
changed, which modify `offsetOfMaxTimestamp` no matter whether it's 
`LOG_APPEND_TIME` or not, but only verify the `toMagic`, which led to this 
error.
   
   More details can be found in 
[KAFKA-16310](https://issues.apache.org/jira/browse/KAFKA-16310)
   
   ## Solution
   Fix the verifying logic and consider both the `timestampType` and magic 
version. 
   
   ## Test
   run `./gradlew clean core:test --tests 
integration.kafka.api.OffsetOfMaxTimestampTest.testWithNoCompression` and it 
passed
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512790426


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping

Review Comment:
   You're right! Tests added. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]

2024-03-05 Thread via GitHub


soarez commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1512748010


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##
@@ -289,13 +289,10 @@ class BrokerMetadataPublisher(
 try {
   // Start log manager, which will perform (potentially lengthy)
   // recovery-from-unclean-shutdown if required.
-  logManager.startup(metadataCache.getAllTopics())
-
-  // Delete partition directories which we're not supposed to have. We have
-  // to do this before starting ReplicaManager, so that the stray replicas
-  // don't block creation of new ones with different IDs but the same 
names.
-  // See KAFKA-14616 for details.
-  logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics())
+  logManager.startup(
+metadataCache.getAllTopics(),
+shouldBeStrayKraftLog = log => 
LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)

Review Comment:
   A suggestion, due to the following concerns:
   
   * LogManager shouldn't handle metadata records, these types shouldn't depend 
on each other. The metadata records should be handled here instead.
   * The argument name looks a bit strange, namely the 'should' and 'kraft' 
parts.
   
   ```suggestion
   isStray = (topicId, partition) => 
Option(newImage.topics().getPartition(topicId.getOrElse{
 throw new RuntimeException(s"Partition $partition does not have a 
topic ID, " +
   "which is not allowed when running in KRaft mode.")
   }, partition.partition())).exists(_.replicas.contains(brokerId))
   ```
   
   Perhaps LogManager should declare a type for this argument since it's 
propagated down the call stack several levels?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File],
 } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
   addStrayLog(topicPartition, log)
   warn(s"Loaded stray log: $logDir")
+} else if (shouldBeStrayKraftLog(log)) {
+  // Mark the partition directories we're not supposed to have as stray. 
We have to do this
+  // during log load because topics may have been recreated with the same 
name while a disk
+  // was offline.
+  // See KAFKA-16234, KAFKA-16157 and KAFKA-14616 for details.

Review Comment:
   Perhaps it could help a future reader to clarify that kraft mode (as opposed 
to zk) does not track deleted topics nor prevent them from being re-created 
with the same name before every replica has been deleted, and so there's no way 
for a broker with a to-be-deleted replica in an offline directory to detect 
this earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512747215


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   Nice catch @KevinZTW !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512687959


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   Do other doc tasks have similar issue? for example: 
`genSourceConnectorConfigDocs`, `genSinkConnectorConfigDocs`, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


KevinZTW commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512669605


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   Since the log information is generated by the Metric class, another way I 
found is to add `log4j.logger.org.apache.kafka.common.metrics.Metrics=WARN` to 
the `/kafka/connect/runtime/src/test/resources/log4j.properties`. 
   
   However, this way also affects the log behavior during the test execution so 
I think only close the log in main function is a better way.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


KevinZTW commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512665815


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   In the current design, the way we generate our documentation file is 
executing the `main()` in above 434 lines and took everything from the stdout 
as a file. 
   
   ```gradle
   task genConsumerMetricsDocs(type: JavaExec) {
   classpath = sourceSets.test.runtimeClasspath
   mainClass = 'org.apache.kafka.clients.consumer.internals.ConsumerMetrics'
   if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
   standardOutput = new File(generatedDocsDir, 
"consumer_metrics.html").newOutputStream()
 }
   ```
   This caused all log information also be added into our docs as you can see 
here
   
   https://kafka.apache.org/documentation.html#connect_monitoring
   
![image](https://github.com/apache/kafka/assets/38662781/d9ec6023-6b0c-4165-b6f8-59cb039669d8)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]

2024-03-05 Thread via GitHub


bachmanity1 commented on PR #15475:
URL: https://github.com/apache/kafka/pull/15475#issuecomment-1978544182

   @kirktrue @mimaison can you have a look, please? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]

2024-03-05 Thread via GitHub


bachmanity1 opened a new pull request, #15475:
URL: https://github.com/apache/kafka/pull/15475

   When a client communicates with OIDC provider to retrieve an access token 
RFC-6749 says that clientID and clientSecret must be urlencoded in the 
authorization header. (see https://tools.ietf.org/html/rfc6749#section-2.3.1) 
However, it seems that in practice some OIDC providers do not enforce this, so 
I'm proposing to introduce a new configuration parameter that will optionally 
urlencode clientId & clientSecret in the authorization header. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512657775


##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;
-else
-shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+// For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping

Review Comment:
   It seems to me this also fix the path of nonexistent magic code 
(`convertAndAssignOffsetsNonCompressed`). Is it possible to add test for that 
case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16345) Optionally allow urlencoding clientId and clientSecret in authorization header

2024-03-05 Thread Nelson B. (Jira)
Nelson B. created KAFKA-16345:
-

 Summary: Optionally allow urlencoding clientId and clientSecret in 
authorization header
 Key: KAFKA-16345
 URL: https://issues.apache.org/jira/browse/KAFKA-16345
 Project: Kafka
  Issue Type: Bug
Reporter: Nelson B.


When a client communicates with OIDC provider to retrieve an access token 
RFC-6749 says that clientID and clientSecret must be urlencoded in the 
authorization header. (see [https://tools.ietf.org/html/rfc6749#section-2.3.1)] 
However, it seems that in practice some OIDC providers do not enforce this, so 
I was thinking about introducing a new configuration parameter that will 
optionally urlencode clientId & clientSecret in the authorization header. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16345) Optionally allow urlencoding clientId and clientSecret in authorization header

2024-03-05 Thread Nelson B. (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nelson B. reassigned KAFKA-16345:
-

Assignee: Nelson B.

> Optionally allow urlencoding clientId and clientSecret in authorization header
> --
>
> Key: KAFKA-16345
> URL: https://issues.apache.org/jira/browse/KAFKA-16345
> Project: Kafka
>  Issue Type: Bug
>Reporter: Nelson B.
>Assignee: Nelson B.
>Priority: Minor
>
> When a client communicates with OIDC provider to retrieve an access token 
> RFC-6749 says that clientID and clientSecret must be urlencoded in the 
> authorization header. (see 
> [https://tools.ietf.org/html/rfc6749#section-2.3.1)] However, it seems that 
> in practice some OIDC providers do not enforce this, so I was thinking about 
> introducing a new configuration parameter that will optionally urlencode 
> clientId & clientSecret in the authorization header. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16341) Fix un-compressed records

2024-03-05 Thread Johnny Hsu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Johnny Hsu reassigned KAFKA-16341:
--

Assignee: Johnny Hsu

> Fix un-compressed records
> -
>
> Key: KAFKA-16341
> URL: https://issues.apache.org/jira/browse/KAFKA-16341
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Johnny Hsu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15473:
URL: https://github.com/apache/kafka/pull/15473#discussion_r1512617796


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java:
##
@@ -431,6 +432,7 @@ public interface LiteralSupplier {
  * @param args the arguments
  */
 public static void main(String[] args) {
+LogManager.shutdown();

Review Comment:
   why we need this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-05 Thread via GitHub


nizhikov commented on code in PR #15365:
URL: https://github.com/apache/kafka/pull/15365#discussion_r1512610273


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import kafka.api.AbstractSaslTest;
+import kafka.api.Both$;
+import kafka.utils.JaasTestUtils;
+import kafka.utils.TestUtils;
+import kafka.zk.ConfigEntityChangeNotificationZNode;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.function.Executable;
+import scala.Option;
+import scala.Some$;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
+private static final String TOPIC = "topic";
+public static final int NUM_PARTITIONS = 1;
+public static final int BROKER_COUNT = 1;
+public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256";
+private static final Seq KAFKA_SERVER_SASL_MECHANISMS = 
seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM));
+
+@SuppressWarnings({"deprecation"})
+private Consumer createConsumer() {
+return createConsumer(
+new ByteArrayDeserializer(),
+new ByteArrayDeserializer(),
+new Properties(),
+JavaConverters.asScalaSet(Collections.emptySet()).toList()
+);
+}
+
+@Override
+public SecurityProtocol securityProtocol() {
+return SecurityProtocol.SASL_PLAINTEXT;
+}
+
+@Override
+public Option serverSaslProperties() {
+return 
Some$.MODULE$.apply(kafkaServerSaslProperties(KAFKA_SERVER_SASL_MECHANISMS, 
KAFKA_CLIENT_SASL_MECHANISM));
+}
+
+@Override
+public Option clientSaslProperties() {
+return 
Some$.MODULE$.apply(kafkaClientSaslProperties(KAFKA_CLIENT_SASL_MECHANISM, 
false));
+}
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+@Override
+public void configureSecurityBeforeServersStart(TestInfo testInfo) {
+super.configureSecurityBeforeServersStart(testInfo);
+
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
+// Create broker credentials before starting brokers
+createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), 
JaasTestUtils.KafkaScramAdminPassword());
+}
+
+@Override
+public Admin createPrivilegedAdminClient() {
+return createAdminClient(bootstrapServers(listenerName()), 
securityProtocol(), trustStoreFile(), clientSaslProperties(),
+KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), 
JaasTestUtils.KafkaScramAdminPassword());
+}
+
+@BeforeEach
+@Override
+public void setUp(TestInfo testInfo) {
+startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, 
Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
+JaasTestUtils.KafkaServerContextName()));
+super.setUp(testInfo);
+createTopic(
+TOPIC,
+NUM_PARTITIONS,
+BROKER_COUNT,
+new Properties(),
+listenerName(),
+new Properties());

Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-05 Thread via GitHub


nizhikov commented on code in PR #15365:
URL: https://github.com/apache/kafka/pull/15365#discussion_r1512609597


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import kafka.api.AbstractSaslTest;
+import kafka.api.Both$;
+import kafka.utils.JaasTestUtils;
+import kafka.utils.TestUtils;
+import kafka.zk.ConfigEntityChangeNotificationZNode;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.function.Executable;
+import scala.Option;
+import scala.Some$;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
+private static final String TOPIC = "topic";
+public static final int NUM_PARTITIONS = 1;
+public static final int BROKER_COUNT = 1;
+public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256";
+private static final Seq KAFKA_SERVER_SASL_MECHANISMS = 
seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM));
+
+@SuppressWarnings({"deprecation"})
+private Consumer createConsumer() {
+return createConsumer(
+new ByteArrayDeserializer(),
+new ByteArrayDeserializer(),
+new Properties(),
+JavaConverters.asScalaSet(Collections.emptySet()).toList()
+);
+}
+
+@Override
+public SecurityProtocol securityProtocol() {
+return SecurityProtocol.SASL_PLAINTEXT;
+}
+
+@Override
+public Option serverSaslProperties() {
+return 
Some$.MODULE$.apply(kafkaServerSaslProperties(KAFKA_SERVER_SASL_MECHANISMS, 
KAFKA_CLIENT_SASL_MECHANISM));
+}
+
+@Override
+public Option clientSaslProperties() {
+return 
Some$.MODULE$.apply(kafkaClientSaslProperties(KAFKA_CLIENT_SASL_MECHANISM, 
false));
+}
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+@Override
+public void configureSecurityBeforeServersStart(TestInfo testInfo) {
+super.configureSecurityBeforeServersStart(testInfo);
+
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
+// Create broker credentials before starting brokers
+createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), 
JaasTestUtils.KafkaScramAdminPassword());
+}
+
+@Override
+public Admin createPrivilegedAdminClient() {
+return createAdminClient(bootstrapServers(listenerName()), 
securityProtocol(), trustStoreFile(), clientSaslProperties(),
+KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), 
JaasTestUtils.KafkaScramAdminPassword());
+}
+
+@BeforeEach
+@Override
+public void setUp(TestInfo testInfo) {
+startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, 
Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
+JaasTestUtils.KafkaServerContextName()));
+super.setUp(testInfo);
+createTopic(
+TOPIC,
+NUM_PARTITIONS,
+BROKER_COUNT,
+new Properties(),
+listenerName(),
+new Properties());

Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512607158


##
tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java:
##
@@ -333,7 +382,7 @@ private void assertExitCodeIsOne(String... args) {
 }
 
 private List expectedOffsetsWithInternal() {
-List consOffsets = IntStream.range(0, offsetTopicPartitionCount + 
1)
+List consOffsets = IntStream.range(0, offsetTopicPartitionCount)

Review Comment:
   This is a side fix for this test. Before this PR, the 
`offsetTopicPartitionCount` doesn't feed into cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on PR #15474:
URL: https://github.com/apache/kafka/pull/15474#issuecomment-1978460395

   @chia7712 @ijuma @hachikuji , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512604852


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##
@@ -379,8 +381,11 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
 && batch.magic() > RecordBatch.MAGIC_VALUE_V0
 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
 
-if (record.timestamp() > maxTimestamp)
+if (record.timestamp() > maxTimestamp) {
 maxTimestamp = record.timestamp();
+// The offset is only increased when it is a valid 
record
+offsetOfMaxTimestamp = initialOffset + 
validatedRecords.size();

Review Comment:
   Also set the correct offset of MaxTimestamp while records traversing. 



##
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##
@@ -263,13 +262,8 @@ public RecordsInfo info() {
 } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
 return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
 } else {
-long shallowOffsetOfMaxTimestamp;
-// Use the last offset when dealing with record batches
-if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-shallowOffsetOfMaxTimestamp = lastOffset;

Review Comment:
   I don't understand why we should always set to the last offset here. This 
will fail the getOffsetByMaxTimestamp test. Is that expected? Maybe @ijuma 
could answer this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]

2024-03-05 Thread via GitHub


chia7712 commented on code in PR #15365:
URL: https://github.com/apache/kafka/pull/15365#discussion_r1512597183


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java:
##
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer.group;
+
+import kafka.admin.ConsumerGroupCommand;
+import kafka.api.AbstractSaslTest;
+import kafka.api.Both$;
+import kafka.utils.JaasTestUtils;
+import kafka.utils.TestUtils;
+import kafka.zk.ConfigEntityChangeNotificationZNode;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.function.Executable;
+import scala.Option;
+import scala.Some$;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+import scala.collection.immutable.Map$;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
+private static final String TOPIC = "topic";
+public static final int NUM_PARTITIONS = 1;
+public static final int BROKER_COUNT = 1;
+public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256";
+private static final Seq KAFKA_SERVER_SASL_MECHANISMS = 
seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM));
+
+@SuppressWarnings({"deprecation"})
+private Consumer createConsumer() {
+return createConsumer(
+new ByteArrayDeserializer(),
+new ByteArrayDeserializer(),
+new Properties(),
+JavaConverters.asScalaSet(Collections.emptySet()).toList()
+);
+}
+
+@Override
+public SecurityProtocol securityProtocol() {
+return SecurityProtocol.SASL_PLAINTEXT;
+}
+
+@Override
+public Option serverSaslProperties() {
+return 
Some$.MODULE$.apply(kafkaServerSaslProperties(KAFKA_SERVER_SASL_MECHANISMS, 
KAFKA_CLIENT_SASL_MECHANISM));
+}
+
+@Override
+public Option clientSaslProperties() {
+return 
Some$.MODULE$.apply(kafkaClientSaslProperties(KAFKA_CLIENT_SASL_MECHANISM, 
false));
+}
+
+@Override
+public int brokerCount() {
+return 1;
+}
+
+@Override
+public void configureSecurityBeforeServersStart(TestInfo testInfo) {
+super.configureSecurityBeforeServersStart(testInfo);
+
zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path());
+// Create broker credentials before starting brokers
+createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), 
JaasTestUtils.KafkaScramAdminPassword());
+}
+
+@Override
+public Admin createPrivilegedAdminClient() {
+return createAdminClient(bootstrapServers(listenerName()), 
securityProtocol(), trustStoreFile(), clientSaslProperties(),
+KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), 
JaasTestUtils.KafkaScramAdminPassword());
+}
+
+@BeforeEach
+@Override
+public void setUp(TestInfo testInfo) {
+startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, 
Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$,
+JaasTestUtils.KafkaServerContextName()));
+super.setUp(testInfo);
+createTopic(
+TOPIC,
+NUM_PARTITIONS,
+BROKER_COUNT,
+new Properties(),
+listenerName(),
+new Properties());

[PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]

2024-03-05 Thread via GitHub


showuon opened a new pull request, #15474:
URL: https://github.com/apache/kafka/pull/15474

   Fix `getOffsetByMaxTimestamp` for compressed records.
   
   This PR adds:
   1. For inPlaceAssignment case, compute the correct offset for maxTimestamp 
when traversing the batch records, and set to `ValidationResult` in the end, 
instead of setting to last offset always.
   2. For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log 
create time, like non-compressed, and inPlaceAssignment cases, instead of 
setting to last offset always.
   3. Add tests to verify the fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >