Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
showuon commented on PR #15444: URL: https://github.com/apache/kafka/pull/15444#issuecomment-1980240072 Ah, you're right! @iit2009060 , I missed that! Will you open another PR to fix it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1513877668 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; +/* RE2J compatible regex */ +private SubscriptionPattern subscriptionPattern; Review Comment: I think it is a bit overkill to have an abstraction like this, the Pattern and SubscriptionPattern are already abstractions for the underneath regex string. What do you guys think @lianetm @dajac? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15561: Client support for new SubscriptionPattern based subscription [kafka]
Phuc-Hong-Tran commented on code in PR #15188: URL: https://github.com/apache/kafka/pull/15188#discussion_r1513877668 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ## @@ -84,6 +85,9 @@ private enum SubscriptionType { /* the pattern user has requested */ private Pattern subscribedPattern; +/* RE2J compatible regex */ +private SubscriptionPattern subscriptionPattern; Review Comment: I think it is a bit overkill to have an abstraction like this. What do you guys think @lianetm @dajac? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16152: Fix PlaintextConsumerTest.testStaticConsumerDetectsNewPartitionCreatedAfterRestart [kafka]
dajac commented on code in PR #15419: URL: https://github.com/apache/kafka/pull/15419#discussion_r1513841644 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -546,8 +546,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { data.setMemberEpoch(membershipManager.memberEpoch()); // InstanceId - only sent if has changed since the last heartbeat Review Comment: Right. This part is clearly wrong in the doc. It cannot change within the lifetime of the consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
iit2009060 commented on PR #15444: URL: https://github.com/apache/kafka/pull/15444#issuecomment-1980040479 @showuon @chiacyu This has not fix the overall problem. It just moves the NullPointerException in the RemoteLogManager instead of ProducerStateManager. https://github.com/iit2009060/kafka/blob/trunk/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L744 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP]KAFKA-15444: Native docker image [kafka]
github-actions[bot] commented on PR #14556: URL: https://github.com/apache/kafka/pull/14556#issuecomment-1980016226 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]
bachmanity1 commented on PR #15475: URL: https://github.com/apache/kafka/pull/15475#issuecomment-1980014397 Hi @kirktrue, thanks for the review! I've created a new KIP here https://cwiki.apache.org/confluence/display/KAFKA/KIP-1025%3A+Optionally+URL-encode+clientID+and+clientSecret+in+authorization+header -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Upgrade zookeeper 3.8.3 -> 3.8.4 [kafka]
KevinZTW opened a new pull request, #15480: URL: https://github.com/apache/kafka/pull/15480 upgrade Zookeeper from to 3.8.3 -> 3.8.4 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16346) Fix flay MetricsTest.testMetrics
[ https://issues.apache.org/jira/browse/KAFKA-16346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823831#comment-17823831 ] Chia-Ping Tsai commented on KAFKA-16346: The count is increased even though the value is zero, so using the count is meaningless currently. Maybe we should update `messageConversionsTimeHist` only if the conversion does happen. > Fix flay MetricsTest.testMetrics > > > Key: KAFKA-16346 > URL: https://issues.apache.org/jira/browse/KAFKA-16346 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > {code} > Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > > testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false > FAILED > org.opentest4j.AssertionFailedError: Broker metric not recorded correctly > for > kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce > value 0.0 ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at > app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at > app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314) > at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110) > {code} > The value used to update metrics is calculated by Math.round, so it could be > zero if you have a good machine :) > We should verify the `count` instead of `value`, since it is convincible and > more stable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16346) Fix flay MetricsTest.testMetrics
[ https://issues.apache.org/jira/browse/KAFKA-16346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang reassigned KAFKA-16346: - Assignee: PoAn Yang > Fix flay MetricsTest.testMetrics > > > Key: KAFKA-16346 > URL: https://issues.apache.org/jira/browse/KAFKA-16346 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > {code} > Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > > testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false > FAILED > org.opentest4j.AssertionFailedError: Broker metric not recorded correctly > for > kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce > value 0.0 ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at > app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at > app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at > app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) > at > app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314) > at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110) > {code} > The value used to update metrics is calculated by Math.round, so it could be > zero if you have a good machine :) > We should verify the `count` instead of `value`, since it is convincible and > more stable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16346) Fix flay MetricsTest.testMetrics
Chia-Ping Tsai created KAFKA-16346: -- Summary: Fix flay MetricsTest.testMetrics Key: KAFKA-16346 URL: https://issues.apache.org/jira/browse/KAFKA-16346 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai {code} Gradle Test Run :core:test > Gradle Test Executor 1119 > MetricsTest > testMetrics(boolean) > testMetrics with systemRemoteStorageEnabled: false FAILED org.opentest4j.AssertionFailedError: Broker metric not recorded correctly for kafka.network:type=RequestMetrics,name=MessageConversionsTimeMs,request=Produce value 0.0 ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at app//kafka.api.MetricsTest.verifyBrokerMessageConversionMetrics(MetricsTest.scala:314) at app//kafka.api.MetricsTest.testMetrics(MetricsTest.scala:110) {code} The value used to update metrics is calculated by Math.round, so it could be zero if you have a good machine :) We should verify the `count` instead of `value`, since it is convincible and more stable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
chia7712 commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1513702636 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: Thanks for sharing. That is interesting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
KevinZTW commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1513701081 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: Sorry took a while to check this. I think it is because usually in our project, the `StaticLoggerBinder` is provided by `slf4j-reload4j` and In the Gradle build file, the project `:connect:runtime` adds that as test runtime only. Hence, if we specify the `classpath` as `main` runtime, the path where `slf4j-reload4j`'s jar file is located won't be included. Hence cause the SL4J change to use NOP logger implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]
kirktrue commented on code in PR #15475: URL: https://github.com/apache/kafka/pull/15475#discussion_r1513681366 ## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ## @@ -192,6 +192,12 @@ public class SaslConfigs { + " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no" + " match, the broker will reject the JWT and authentication will fail."; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = "sasl.oauthbearer.header.urlencode.enable"; +public static final boolean DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = false; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE_DOC = "The (optional) setting to enable oauthbearer client to urlencode client_id and client_secret in the authorization header" ++ " in accordance with RFC6749, see https://datatracker.ietf.org/doc/html/rfc6749#section-2.3.1 for more detail. The default value is set to 'false' for backward compatibility"; + + Review Comment: Sorry, another nit about extra whitespace. ```suggestion ``` ## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java: ## @@ -346,10 +350,17 @@ static String parseAccessToken(String responseBody) throws IOException { return sanitizeString("the token endpoint response's access_token JSON attribute", accessTokenNode.textValue()); } -static String formatAuthorizationHeader(String clientId, String clientSecret) { +static String formatAuthorizationHeader(String clientId, String clientSecret, boolean urlencode) throws Review Comment: Oh, I see, this is a `static` method. I don't remember why that was necessary 🤷♂️ ## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ## @@ -192,6 +192,12 @@ public class SaslConfigs { + " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no" + " match, the broker will reject the JWT and authentication will fail."; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = "sasl.oauthbearer.header.urlencode.enable"; Review Comment: Is there a KIP associated with this Jira ticket? Unfortunately, additions of configuration options required a KIP 😞 ## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/HttpAccessTokenRetriever.java: ## @@ -346,10 +350,17 @@ static String parseAccessToken(String responseBody) throws IOException { return sanitizeString("the token endpoint response's access_token JSON attribute", accessTokenNode.textValue()); } -static String formatAuthorizationHeader(String clientId, String clientSecret) { +static String formatAuthorizationHeader(String clientId, String clientSecret, boolean urlencode) throws Review Comment: Instead of passing in the `urlencode` parameter here, can we use the `urlencodeHeader` directly? ## clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java: ## @@ -192,6 +192,12 @@ public class SaslConfigs { + " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no" + " match, the broker will reject the JWT and authentication will fail."; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = "sasl.oauthbearer.header.urlencode.enable"; +public static final boolean DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE = false; +public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE_DOC = "The (optional) setting to enable oauthbearer client to urlencode client_id and client_secret in the authorization header" Review Comment: Nitpick: ```suggestion public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_ENABLE_DOC = "The (optional) setting to enable the OAuth client to URL-encode the client_id and client_secret in the authorization header" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16123: KStreamKStreamJoinProcessor does not drop late records. [kafka]
mjsax commented on PR #15189: URL: https://github.com/apache/kafka/pull/15189#issuecomment-1979900538 @florin-akermann -- I finally merged https://github.com/apache/kafka/pull/14426 -- can you rebase this PR and fixup tests so we can move forward with this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15417) JoinWindow does not seem to work properly with a KStream - KStream - LeftJoin()
[ https://issues.apache.org/jira/browse/KAFKA-15417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15417. - Fix Version/s: 3.8.0 Resolution: Fixed > JoinWindow does not seem to work properly with a KStream - KStream - > LeftJoin() > > > Key: KAFKA-15417 > URL: https://issues.apache.org/jira/browse/KAFKA-15417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 >Reporter: Victor van den Hoven >Assignee: Victor van den Hoven >Priority: Major > Fix For: 3.8.0 > > Attachments: Afbeelding 1-1.png, Afbeelding 1.png, > SimpleStreamTopology.java, SimpleStreamTopologyTest.java > > > In Kafka-streams 3.4.0 : > According to the javadoc of the Joinwindow: > _There are three different window configuration supported:_ > * _before = after = time-difference_ > * _before = 0 and after = time-difference_ > * _*before = time-difference and after = 0*_ > > However if I use a joinWindow with *before = time-difference and after = 0* > on a kstream-kstream-leftjoin the *after=0* part does not seem to work. > When using _stream1.leftjoin(stream2, joinWindow)_ with > {_}joinWindow{_}.{_}after=0 and joinWindow.before=30s{_}, any new message on > stream 1 that can not be joined with any messages on stream2 should be joined > with a null-record after the _joinWindow.after_ has ended and a new message > has arrived on stream1. > It does not. > Only if the new message arrives after the value of _joinWindow.before_ the > previous message will be joined with a null-record. > > Attached you can find two files with a TopologyTestDriver Unit test to > reproduce. > topology: stream1.leftjoin( stream2, joiner, joinwindow) > joinWindow has before=5000ms and after=0 > message1(key1) -> stream1 > after 4000ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 4900ms message2(key2) -> stream1 -> NO null-record join was made, but > the after period was expired. > after 5000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > after 6000ms message2(key2) -> stream1 -> A null-record join was made, > before period was expired. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax commented on PR #14426: URL: https://github.com/apache/kafka/pull/14426#issuecomment-1979898854 Thanks for the fix! Merged to `trunk`. Really appreciate that you did push this through. Was more complicated than expected and took way to long to get finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
mjsax merged PR #14426: URL: https://github.com/apache/kafka/pull/14426 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
showuon commented on PR #15444: URL: https://github.com/apache/kafka/pull/15444#issuecomment-1979894426 Thanks for the fix! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16209) fetchSnapshot might return null if topic is created before v2.8
[ https://issues.apache.org/jira/browse/KAFKA-16209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16209. --- Fix Version/s: 3.8.0 3.7.1 Resolution: Fixed > fetchSnapshot might return null if topic is created before v2.8 > --- > > Key: KAFKA-16209 > URL: https://issues.apache.org/jira/browse/KAFKA-16209 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.1 >Reporter: Luke Chen >Assignee: Arpit Goyal >Priority: Major > Labels: newbie, newbie++ > Fix For: 3.8.0, 3.7.1 > > > Remote log manager will fetch snapshot via ProducerStateManager > [here|https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java#L608], > but the snapshot map might get nothing if the topic has no snapshot created, > ex: topics before v2.8. Need to fix it to avoid NPE. > old PR: https://github.com/apache/kafka/pull/14615/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16209 : fetchSnapshot might return null if topic is created before v2.8 [kafka]
showuon merged PR #15444: URL: https://github.com/apache/kafka/pull/15444 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16344) Internal topic mm2-offset-syncsinternal created with single partition is putting more load on the broker
[ https://issues.apache.org/jira/browse/KAFKA-16344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823817#comment-17823817 ] Greg Harris commented on KAFKA-16344: - Hi [~janardhanag], thanks for the ticket. At the current time, the offset syncs topic cannot have more than 1 partition. If more than one partition is present, the MirrorSourceTask will only write to partition 0, and the MirrorCheckpointTask will only read from partition 0. Changes to both of these would be necessary to support partitioning that topic, and would require a KIP. For a workaround, are you able to increase your `offset.lag.max` from the default 100? This will make the offset translation less accurate, but can decrease the throughput on that topic. Do you have some more statistics that confirm that this topic is too active? It should have just a fraction of the load of the other topics, so i'd be interested to know more about the scale you're working at. Thanks, Greg > Internal topic mm2-offset-syncsinternal created with single > partition is putting more load on the broker > - > > Key: KAFKA-16344 > URL: https://issues.apache.org/jira/browse/KAFKA-16344 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.5.1 >Reporter: Janardhana Gopalachar >Priority: Major > > We are using Kafka 3.5.1 version, we see that the internal topic created by > mirrormaker > mm2-offset-syncsinternal is created with single partition due to > which the CPU load on the broker which will be leader for this partition is > increased compared to other brokers. Can multiple partitions be created for > the topic so that the CPU load would get distributed > > Topic: mm2-offset-syncscluster-ainternal TopicId: XRvTDbogT8ytNhqX2YTyrA > PartitionCount: 1ReplicationFactor: 3 Configs: > min.insync.replicas=2,cleanup.policy=compact,message.format.version=3.0-IV1 > Topic: mm2-offset-syncscluster-ainternal Partition: 0 Leader: 2 > Replicas: 2,1,0 Isr: 2,1,0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1513659746 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: I'm not sure what _weird test_ you mean here. I've refactored it by using `serverProperties` to provide custom server properties for specific tests, if that's what you meant _weird_. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: Right now I don't think this config is useful because we are not doing the client-side pagination. The config only makes sense if one batch of partitions is large enough to cause client-side OOM. Maybe we should add this config in the future? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513646133 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopics API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); +} Review Comment: The DescribeTopicParitions does not provide the node info as Metadata Api does. However the TopicPartitionInfo constructor requires the node info, but the node info is useless in the describeTopic scenario. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -537,6 +544,18 @@ public Map listAllReassignments(Set
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: The client can only know if the server-side limit is greater when the result is received. Actually, I think this config is only useful for testing. I am not sure whether any user will bother to change this config. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641333 ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); +partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", Review Comment: user-describe-topics-api is removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641009 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception { } } +@SuppressWarnings("NPathComplexity") Review Comment: Do you mean the server returns an invalid cursor or the client sends one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640718 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640577 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. +public DescribeTopicsOptions partitionSizeLimitPerResponse(int partitionSizeLimitPerResponse) { +this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse; +return this; +} + public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } +public boolean useDescribeTopicPartitionsApi() { Review Comment: removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640434 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Removed useDescribeTopicPartitionsApi. It was only used in the UnsupportedVersionException retry, but now I figured out a way to retry in the Call framework. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused controlPlaneRequestProcessor in BrokerServer. [kafka]
appchemist commented on PR #15245: URL: https://github.com/apache/kafka/pull/15245#issuecomment-1979765706 @chia7712 sorry, I checked it late. I run the failed tests on my local too ```./gradlew cleanTest connect:mirror:test --tests MirrorConnectorsIntegrationExactlyOnceTest core:test --tests PlaintextAdminIntegrationTest --tests SaslPlainPlaintextConsumerTest --tests ZkMigrationIntegrationTest --tests SaslSslConsumerTest --tests SaslPlaintextConsumerTest --tests FetchRequestTestDowngrade --tests ProduceRequestTest --tests DynamicBrokerReconfigurationTest tools:test --tests MetadataQuorumCommandTest --tests TopicCommandIntegrationTest storage:test --tests TransactionsWithTieredStoreTest server:test --tests ClientMetricsManagerTest metadata:test --tests QuorumControllerTest``` All pass too. Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add read/write all operation [kafka]
chia7712 commented on code in PR #15462: URL: https://github.com/apache/kafka/pull/15462#discussion_r1513587296 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -498,29 +497,17 @@ public CompletableFuture listGroups( ); } -final Set existingPartitionSet = runtime.partitions(); - -if (existingPartitionSet.isEmpty()) { -return CompletableFuture.completedFuture(new ListGroupsResponseData()); -} - -final List>> futures = -new ArrayList<>(); - -for (TopicPartition tp : existingPartitionSet) { -futures.add(runtime.scheduleReadOperation( -"list-groups", -tp, -(coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), request.typesFilter(), lastCommittedOffset) -).exceptionally(exception -> { -exception = Errors.maybeUnwrapException(exception); -if (exception instanceof NotCoordinatorException) { -return Collections.emptyList(); -} else { -throw new CompletionException(exception); -} -})); -} +final List>> futures = runtime.scheduleReadAllOperation( +"list-groups", +(coordinator, lastCommittedOffset) -> coordinator.listGroups(request.statesFilter(), request.typesFilter(), lastCommittedOffset) +).stream().map(future -> future.exceptionally(exception -> { Review Comment: how about adding the exception function to `scheduleReadAllOperation`? It brings two benefits: 1. simplify the code from callers 2. avoid creating list repeatedly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]
kirktrue commented on code in PR #15479: URL: https://github.com/apache/kafka/pull/15479#discussion_r1513571996 ## clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteRecordsHandler.java: ## @@ -79,15 +79,15 @@ public static SimpleAdminApiFuture newFuture( @Override public DeleteRecordsRequest.Builder buildBatchedRequest(int brokerId, Set keys) { Map deletionsForTopic = new HashMap<>(); -for (Map.Entry entry: recordsToDelete.entrySet()) { -TopicPartition topicPartition = entry.getKey(); +for (TopicPartition topicPartition : keys) { +RecordsToDelete toDelete = recordsToDelete.get(topicPartition); DeleteRecordsRequestData.DeleteRecordsTopic deleteRecords = deletionsForTopic.computeIfAbsent( topicPartition.topic(), key -> new DeleteRecordsRequestData.DeleteRecordsTopic().setName(topicPartition.topic()) ); deleteRecords.partitions().add(new DeleteRecordsRequestData.DeleteRecordsPartition() .setPartitionIndex(topicPartition.partition()) -.setOffset(entry.getValue().beforeOffset())); +.setOffset(toDelete.beforeOffset())); Review Comment: Would it be unduly paranoid to check for `null` before accessing `toDelete`? The call stack up to this point is pretty twisty. I'm assuming that `keys` _shouldn't_ contain any `TopicPartition`s that aren't keys in `recordsToDelete`, but... 🤷♂️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15402: -- Labels: consumer-threading-refactor (was: ) > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Benoit Delbosc >Priority: Major > Labels: consumer-threading-refactor > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15402: -- Component/s: clients > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Benoit Delbosc >Priority: Major > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16100: Add timeout to all the CompletableApplicationEvents [kafka]
kirktrue commented on code in PR #15455: URL: https://github.com/apache/kafka/pull/15455#discussion_r1513515432 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitEvent.java: ## @@ -29,10 +30,19 @@ public abstract class CommitEvent extends CompletableApplicationEvent { */ private final Map offsets; -protected CommitEvent(final Type type, final Map offsets) { -super(type); +protected CommitEvent(final Type type, final Map offsets, final Timer timer) { +super(type, timer); this.offsets = Collections.unmodifiableMap(offsets); +validate(this.offsets); +} + +protected CommitEvent(final Type type, final Map offsets, final long timer) { Review Comment: Thanks for catching that, @cadonna! I renamed it to `deadlineMs` for consistency with the rest of the variable/parameter names. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]
chia7712 commented on PR #15365: URL: https://github.com/apache/kafka/pull/15365#issuecomment-1979651000 @nizhikov I feel this PR is ready, and so please check (or list) the failed tests. If they are unconnected to this PR, I will merge it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused controlPlaneRequestProcessor in BrokerServer. [kafka]
chia7712 merged PR #15245: URL: https://github.com/apache/kafka/pull/15245 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused controlPlaneRequestProcessor in BrokerServer. [kafka]
chia7712 commented on PR #15245: URL: https://github.com/apache/kafka/pull/15245#issuecomment-1979648508 run the failed tests on my local: ```sh ./gradlew cleanTest core:test --tests FetchRequestTestDowngrade --tests ProduceRequestTest --tests DynamicBrokerReconfigurationTest tools:test --tests MetadataQuorumCommandTest ``` all pass. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
kirktrue commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513390603 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. Review Comment: Is there a warning logged in the case where the client sends a limit greater than what's allowed on the broker? ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -537,6 +544,18 @@ public Map listAllReassignments(Set(topicFutures); } +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: AFAICT, the `callName` used within the `Call` object is only used for logging. That said, there's no point in confusing the user who's looking through the logs. ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } +// This is used in the describe topics path if using DescribeTopics API. +private Node replicaToFakeNode(int id) { +return new Node(id, "Dummy", 0); +} Review Comment: Just for my own understanding, why do we favor creating _fake_ nodes instead of looking up the _real_ nodes from the metadata or something? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Sorry for being daft, but when would the user know to set this one way or the other. Is this something that can be handled under the covers? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove test constructor for PartitionAssignment [kafka]
cmccabe merged PR #15435: URL: https://github.com/apache/kafka/pull/15435 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers
[ https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823757#comment-17823757 ] AlexeyASF commented on KAFKA-16319: --- Great news, thank you very much for quick reaction! (y) > Wrong broker destinations for DeleteRecords requests when more than one topic > is involved and the topics/partitions are led by different brokers > > > Key: KAFKA-16319 > URL: https://issues.apache.org/jira/browse/KAFKA-16319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: AlexeyASF >Assignee: Andrew Schofield >Priority: Major > Fix For: 3.8.0 > > > h2. Context > Kafka streams applications send, time after time, {{DeleteRecords}} requests, > via > {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} > method. Such requests may involve more than 1 topic (or partition), and such > requests are supposed to be sent to partitions' leaders brokers. > > h2. Observed behaviour > In case when {{DeleteRecords}} request includes more than one topic (let's > say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different > brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is > sent to only one broker (let’s say {{{}broker1{}}}), leading to partial > not_leader_or_follower errors. As not the whole request was successful > ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the > _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the > response will be partially faulty again and again. It also may (and does) > happen that there is a “mirrored” half-faulty request - in this case, to > {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} > operation fails. > Here’s an anonymised logs example from a production system (“direct” and > “mirrored” requests, one after another): > {code:java} > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 > correlationId=42003907, timeoutMs=3 > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 > correlationId=42003906, timeoutMs=3 {code} > Such request results in the following response (in this case, only for the > "direct" response): > {code:java} > [AdminClient clientId=worker-admin] > Call( > callName=deleteRecords(api=DELETE_RECORDS), > deadlineMs=..., > tries=..., // Can be hundreds > nextAllowedTryMs=...) > got response DeleteRecordsResponseData( > throttleTimeMs=0, > topics=[ > DeleteRecordsTopicResult( > name='topic2', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the > errorCode 6, which is not_leader_or_follower > DeleteRecordsTopicResult( > name='topic1', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the > errorCode 0, which means the operation was successful > ] > ) {code} > h2. Expected behaviour > {{DeleteRecords}} requests are sent to corresponding partitions' leaders > brokers when more than 1 topic/partition is involved and they are led by > different brokers. > h2. Notes > * {_}presumably{_}, introduced in 3.6.1 via > [https://github.com/apache/kafka/pull/13760] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers
[ https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823755#comment-17823755 ] Andrew Schofield commented on KAFKA-16319: -- No worries. My initial assessment was incorrect. The code was broken in 3.6.0 and was still broken in trunk. I've submitted a fix. Essentially, every broker was being sent every topic-partition, even ones that it didn't lead. So, the kafka-delete-records.sh tool was working because overall the KafkaAdmin.deleteRecords request was working, but the bad error codes were being masked/ignored. > Wrong broker destinations for DeleteRecords requests when more than one topic > is involved and the topics/partitions are led by different brokers > > > Key: KAFKA-16319 > URL: https://issues.apache.org/jira/browse/KAFKA-16319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.0, 3.7.0, 3.6.1 >Reporter: AlexeyASF >Assignee: Andrew Schofield >Priority: Major > Fix For: 3.8.0 > > > h2. Context > Kafka streams applications send, time after time, {{DeleteRecords}} requests, > via > {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} > method. Such requests may involve more than 1 topic (or partition), and such > requests are supposed to be sent to partitions' leaders brokers. > > h2. Observed behaviour > In case when {{DeleteRecords}} request includes more than one topic (let's > say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different > brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is > sent to only one broker (let’s say {{{}broker1{}}}), leading to partial > not_leader_or_follower errors. As not the whole request was successful > ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the > _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the > response will be partially faulty again and again. It also may (and does) > happen that there is a “mirrored” half-faulty request - in this case, to > {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} > operation fails. > Here’s an anonymised logs example from a production system (“direct” and > “mirrored” requests, one after another): > {code:java} > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 > correlationId=42003907, timeoutMs=3 > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 > correlationId=42003906, timeoutMs=3 {code} > Such request results in the following response (in this case, only for the > "direct" response): > {code:java} > [AdminClient clientId=worker-admin] > Call( > callName=deleteRecords(api=DELETE_RECORDS), > deadlineMs=..., > tries=..., // Can be hundreds > nextAllowedTryMs=...) > got response DeleteRecordsResponseData( > throttleTimeMs=0, > topics=[ > DeleteRecordsTopicResult( > name='topic2', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the > errorCode 6, which is not_leader_or_follower > DeleteRecordsTopicResult( > name='topic1', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the > errorCode 0, which means the operation was successful > ] > ) {code} > h2. Expected behaviour > {{DeleteRecords}} requests are sent to corresponding partitions' leaders > brokers when more than 1 topic/partition is involved and they are led by > different brokers. > h2. Notes > * {_}presumably{_}, introduced in 3.6.1 via > [https://github.com/apache/kafka/pull/13760] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16319: Divide DeleteTopics requests by leader node [kafka]
AndrewJSchofield opened a new pull request, #15479: URL: https://github.com/apache/kafka/pull/15479 PR https://github.com/apache/kafka/pull/13760 introduced a problem with KafkaAdmin.deleteRecords. If the request acted on a set of topic-partitions which spanned multiple leader brokers, the request for all of the topic-partitions were sent to all brokers. While this technically worked, it did mean that every broker handled its own topic-partitions and failed all of the ones that it didn't lead. This meant that every topic-partition was acted on, but at the cost of a lot of failed subrequests. The code was not paying attention to the mapping from node to topic-partition passed into `DeleteRecordsHandler.buildBatchedRequest`. This PR filters the subrequests for each node based on the mapping passed into that method. One of the existing unit tests actually codified the wrong behavior, so that has been fixed. Then a more complicated unit test that generates a mapping and then checks that it is correctly filtered has been added. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR; Log reason for deleting a kraft snapshot [kafka]
jsancio opened a new pull request, #15478: URL: https://github.com/apache/kafka/pull/15478 There are three reasons why KRaft would delete a snapshot. One, it is older than the retention time. Two, the total number of bytes between the log and the snapshot excess the configuration. Three, the latest snapshot is newer than the log. This change allows KRaft to log the exact reason why a snapshot is getting deleted. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
dajac commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Shouldn't the selection be automatic? I don't think users will bother about this. Basically, the new API should be used when available and the old one when not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
dajac commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953 ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: Shouldn't the selection be automatic? I don't think uses will bother about this. Basically, the new API should be used when available and the old one when not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]
AndrewJSchofield commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1513310569 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } +@SuppressWarnings("MethodLength") +private Map> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final Collection topicNames, DescribeTopicsOptions options) { +final Map> topicFutures = new HashMap<>(topicNames.size()); +final ArrayList topicNamesList = new ArrayList<>(); +for (String topicName : topicNames) { +if (topicNameIsUnrepresentable(topicName)) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new InvalidTopicException("The given topic name '" + +topicName + "' cannot be represented in a request.")); +topicFutures.put(topicName, future); +} else if (!topicFutures.containsKey(topicName)) { +topicFutures.put(topicName, new KafkaFutureImpl<>()); +topicNamesList.add(topicName); +} +} +final long now = time.milliseconds(); +Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), Review Comment: Should this be "describeTopicPartitions"? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { Review Comment: I think you need a small change to KIP-966 to document these changes to the admin API. ## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ## @@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); +partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", Review Comment: In other cases where a new API has been introduced, I think the principle followed is to try the new one without an option, and falling back if it is detected that it's required. That would be much nicer than expecting the innocent user from understanding what `user-describe-topics-api` means. ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception { } } +@SuppressWarnings("NPathComplexity") Review Comment: Could we have a test with an invalid cursor? ## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ## @@ -47,8 +49,32 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } +/** + * Whether to use the DescribeTopicPartitions API. It should be set to false if DescribeTopicPartitions API is + * not supported. + * + */ +public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean useDescribeTopicPartitionsApi) { +this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi; +return this; +} + +// Note that, partitionSizeLimitPerResponse will not be effective if it is larger than the config +// max.request.partition.size.limit on the server side. +public DescribeTopicsOptions partitionSizeLimitPerResponse(int partitionSizeLimitPerResponse) { +this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse; +return this; +} + public boolean includeAuthorizedOperations() { return includeAuthorizedOperations; } +public boolean useDescribeTopicPartitionsApi() { Review Comment: I suggest just `useDescribeTopicPartitions()`. In the Javadoc, you can mention that it's using the DescribeTopicPartitions API under the covers. Most users of the Kafka admin client would consider `KafkaAdminClient` to be the API, rather than the Kafka protocol which is what is meant here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries a
Re: [PR] KAFKA-14133: Move consumer mock in TaskManagerTest to Mockito - part 2 [kafka]
clolov commented on PR #15261: URL: https://github.com/apache/kafka/pull/15261#issuecomment-1979391796 Thanks for the review @cadonna! I will provide an updated version tomorrow morning! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove test constructor for PartitionAssignment [kafka]
cmccabe commented on code in PR #15435: URL: https://github.com/apache/kafka/pull/15435#discussion_r1513283747 ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -792,6 +792,43 @@ class KRaftClusterTest { } } + /** + * Test that setting the Confluent-specific configuration + * confluent.apply.create.topic.policy.to.create.partitions has the expected effect. + */ + @ParameterizedTest + @ValueSource(strings = Array("3.7-IV0", "3.7-IV2")) + def testCreatePartitions(metadataVersionString: String): Unit = { Review Comment: This test passes before and after the change. I just added it because I noticed a test gap in this area. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15401 | Segment with corrupted index should not be tiered [kafka]
divijvaidya commented on code in PR #15472: URL: https://github.com/apache/kafka/pull/15472#discussion_r1513268220 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -707,6 +708,8 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException this.cancel(); } catch (InterruptedException | RetriableException ex) { throw ex; +} catch (CorruptIndexException ex) { +logger.error("Error occurred while copying log segments. Index appeared to be corrupted for partition: {} ", topicIdPartition, ex); Review Comment: I am assuming that the way this error will be monitored is by creating an alarm on `RemoteCopyLagSegments` [1]. Is that right? Can you also please why shouldn't we increment the `failedRemoteCopyRequestRate` and `failedRemoteCopyRequestRate` metric that are being incremented in catch exception below? [1] https://kafka.apache.org/documentation.html#tiered_storage_monitoring ## storage/src/main/java/org/apache/kafka/storage/internals/log/TimeIndex.java: ## @@ -75,13 +75,14 @@ public void sanityCheck() { TimestampOffset entry = lastEntry(); long lastTimestamp = entry.timestamp; long lastOffset = entry.offset; -if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) -throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " -+ "non-zero size but the last timestamp is " + lastTimestamp + " which is less than the first timestamp " -+ timestamp(mmap(), 0)); + if (entries() != 0 && lastOffset < baseOffset()) throw new CorruptIndexException("Corrupt time index found, time index file (" + file().getAbsolutePath() + ") has " + "non-zero size but the last offset is " + lastOffset + " which is less than the first offset " + baseOffset()); +if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) Review Comment: I am assuming that the reason of moving this down is to use the less expensive validation first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [No Review] KAFKA-14563 part 1 [kafka]
CalvinConfluent opened a new pull request, #15477: URL: https://github.com/apache/kafka/pull/15477 Draft. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers
[ https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823723#comment-17823723 ] AlexeyASF commented on KAFKA-16319: --- ??How do you get it to do that? Do you have a small test program and instructions for setting up the cluster to make it happen? I'm sure I can fix it if only I can make it fail :) For one thing, I would add a test that fails without the fix and then succeeds when the code has been fixed so having a pointer would be helpful.?? That's all a great idea, but, unfortunately, no, i haven't yet created a minimalistic test setup. ??I have reproduced it. Certainly fails like this on 3.6.0.?? [~schofielaj] Did you have a chance already to add a minimalistic test for this? If you did / you are on it already, i'd like to avoid duplicated work, but if you're haven't started yet - i can give it a shot 2-3 hours later. > Wrong broker destinations for DeleteRecords requests when more than one topic > is involved and the topics/partitions are led by different brokers > > > Key: KAFKA-16319 > URL: https://issues.apache.org/jira/browse/KAFKA-16319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: AlexeyASF >Assignee: Andrew Schofield >Priority: Major > > h2. Context > Kafka streams applications send, time after time, {{DeleteRecords}} requests, > via > {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} > method. Such requests may involve more than 1 topic (or partition), and such > requests are supposed to be sent to partitions' leaders brokers. > > h2. Observed behaviour > In case when {{DeleteRecords}} request includes more than one topic (let's > say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different > brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is > sent to only one broker (let’s say {{{}broker1{}}}), leading to partial > not_leader_or_follower errors. As not the whole request was successful > ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the > _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the > response will be partially faulty again and again. It also may (and does) > happen that there is a “mirrored” half-faulty request - in this case, to > {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} > operation fails. > Here’s an anonymised logs example from a production system (“direct” and > “mirrored” requests, one after another): > {code:java} > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 > correlationId=42003907, timeoutMs=3 > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 > correlationId=42003906, timeoutMs=3 {code} > Such request results in the following response (in this case, only for the > "direct" response): > {code:java} > [AdminClient clientId=worker-admin] > Call( > callName=deleteRecords(api=DELETE_RECORDS), > deadlineMs=..., > tries=..., // Can be hundreds > nextAllowedTryMs=...) > got response DeleteRecordsResponseData( > throttleTimeMs=0, > topics=[ > DeleteRecordsTopicResult( > name='topic2', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the > errorCode 6, which is not_leader_or_follower > DeleteRecordsTopicResult( > name='topic1', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the > errorCode 0, which means the operation was successful > ] > ) {code} > h2. Expected behaviour > {{DeleteRecords}} requests are sent to corresponding partitions' leaders > brokers when more than 1 topic/partition is involved and they are led by > different brokers. > h2. Notes > * {_}presumably{_}, introduced in 3.6.1 via > [https://github.com/apache/kafka/pull/
[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers
[ https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823706#comment-17823706 ] Andrew Schofield commented on KAFKA-16319: -- I have reproduced it. Certainly fails like this on 3.6.0. > Wrong broker destinations for DeleteRecords requests when more than one topic > is involved and the topics/partitions are led by different brokers > > > Key: KAFKA-16319 > URL: https://issues.apache.org/jira/browse/KAFKA-16319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: AlexeyASF >Assignee: Andrew Schofield >Priority: Major > > h2. Context > Kafka streams applications send, time after time, {{DeleteRecords}} requests, > via > {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} > method. Such requests may involve more than 1 topic (or partition), and such > requests are supposed to be sent to partitions' leaders brokers. > > h2. Observed behaviour > In case when {{DeleteRecords}} request includes more than one topic (let's > say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different > brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is > sent to only one broker (let’s say {{{}broker1{}}}), leading to partial > not_leader_or_follower errors. As not the whole request was successful > ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the > _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the > response will be partially faulty again and again. It also may (and does) > happen that there is a “mirrored” half-faulty request - in this case, to > {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} > operation fails. > Here’s an anonymised logs example from a production system (“direct” and > “mirrored” requests, one after another): > {code:java} > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 > correlationId=42003907, timeoutMs=3 > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 > correlationId=42003906, timeoutMs=3 {code} > Such request results in the following response (in this case, only for the > "direct" response): > {code:java} > [AdminClient clientId=worker-admin] > Call( > callName=deleteRecords(api=DELETE_RECORDS), > deadlineMs=..., > tries=..., // Can be hundreds > nextAllowedTryMs=...) > got response DeleteRecordsResponseData( > throttleTimeMs=0, > topics=[ > DeleteRecordsTopicResult( > name='topic2', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the > errorCode 6, which is not_leader_or_follower > DeleteRecordsTopicResult( > name='topic1', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the > errorCode 0, which means the operation was successful > ] > ) {code} > h2. Expected behaviour > {{DeleteRecords}} requests are sent to corresponding partitions' leaders > brokers when more than 1 topic/partition is involved and they are led by > different brokers. > h2. Notes > * {_}presumably{_}, introduced in 3.6.1 via > [https://github.com/apache/kafka/pull/13760] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16322 : upgrade jline [kafka]
johnnychhsu commented on PR #15464: URL: https://github.com/apache/kafka/pull/15464#issuecomment-1979168636 the Jenkins pipeline (jdk8, scala 2.12) failed due to `Unable to connect to the child process`. I tried the same command in locall, run `./gradlew -PscalaVersion=2.12 clean check -x test --profile --continue -PxmlSpotBugsReport=true -PkeepAliveMode=session` in local and it succeeded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14048) The Next Generation of the Consumer Rebalance Protocol
[ https://issues.apache.org/jira/browse/KAFKA-14048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823698#comment-17823698 ] Aratz commented on KAFKA-14048: --- Where can I find the timeline for this work? Is there any? Thank you. > The Next Generation of the Consumer Rebalance Protocol > -- > > Key: KAFKA-14048 > URL: https://issues.apache.org/jira/browse/KAFKA-14048 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > This Jira tracks the development of KIP-848: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add read/write all operation [kafka]
dajac commented on PR #15462: URL: https://github.com/apache/kafka/pull/15462#issuecomment-1979123455 @chia7712 Would you be interested in reviewing this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16099) Handle timeouts for AsyncKafkaConsumer.commitSync
[ https://issues.apache.org/jira/browse/KAFKA-16099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823686#comment-17823686 ] Aratz commented on KAFKA-16099: --- Where can we find the corresponding PR? > Handle timeouts for AsyncKafkaConsumer.commitSync > - > > Key: KAFKA-16099 > URL: https://issues.apache.org/jira/browse/KAFKA-16099 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Andrew Schofield >Priority: Major > Fix For: 3.7.0 > > > The handling of synchronous offset commits in the background thread does not > observe the caller's timeout. In the situation that a commit request needs to > be retried, the retries should not extend beyond the caller's timeout. The > CommitApplicationEvent should contain the timeout and not continue beyond > that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16222) KRaft Migration: Incorrect default user-principal quota after migration
[ https://issues.apache.org/jira/browse/KAFKA-16222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823675#comment-17823675 ] PoAn Yang commented on KAFKA-16222: --- I can reproduce the error. I will look into code tomorrow. Thanks. {noformat} > docker run -it --rm --network mykafka-zookeeper_default bitnami/kafka:3.6 > kafka-configs.sh --describe --all --entity-type users --bootstrap-server > kafka-0:9092 kafka 15:19:03.20 INFO ==> kafka 15:19:03.20 INFO ==> Welcome to the Bitnami kafka container kafka 15:19:03.20 INFO ==> Subscribe to project updates by watching https://github.com/bitnami/containers kafka 15:19:03.20 INFO ==> Submit issues and feature requests at https://github.com/bitnami/containers/issues kafka 15:19:03.20 INFO ==> Quota configs for user-principal 'myuser%40prod' are consumer_byte_rate=2048.0, request_percentage=200.0, producer_byte_rate=1024.0{noformat} > KRaft Migration: Incorrect default user-principal quota after migration > --- > > Key: KAFKA-16222 > URL: https://issues.apache.org/jira/browse/KAFKA-16222 > Project: Kafka > Issue Type: Bug > Components: kraft, migration >Affects Versions: 3.7.0, 3.6.1 >Reporter: Dominik >Assignee: PoAn Yang >Priority: Blocker > > We observed that our default user quota seems not to be migrated correctly. > Before Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for the *default user-principal* are > consumer_byte_rate=100.0, producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}@{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > After Migration: > bin/kafka-configs.sh --describe --all --entity-type users > Quota configs for *user-principal ''* are consumer_byte_rate=100.0, > producer_byte_rate=100.0 > Quota configs for user-principal {color:#172b4d}'myuser{*}%40{*}prod'{color} > are consumer_byte_rate=1.5E8, producer_byte_rate=1.5E8 > > Additional finding: Our names contains a "@" which also lead to incorrect > after migration state. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
ijuma commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1513013595 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -52,20 +55,30 @@ public class GetOffsetShellTest { private final int topicCount = 4; private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; +private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { -return "topic" + i; +return topicName + i; } -public void setUp() { +@BeforeEach +public void before(TestInfo testInfo) { Review Comment: Hmm, this is a weird test to verify this behavior. @hachikuji @jolshan had identified a test that seemed to be a better candidate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16293: Test log directory failure in Kraft [kafka]
pprovenzano commented on code in PR #15409: URL: https://github.com/apache/kafka/pull/15409#discussion_r1513010665 ## tests/kafkatest/tests/core/log_dir_failure_test.py: ## @@ -84,20 +84,25 @@ def __init__(self, test_context): self.num_consumers = 1 def setUp(self): -self.zk.start() +if self.zk: +self.zk.start() def min_cluster_size(self): """Override this since we're adding services outside of the constructor""" return super(LogDirFailureTest, self).min_cluster_size() + self.num_producers * 2 + self.num_consumers * 2 -@cluster(num_nodes=9) -@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"]) -def test_replication_with_disk_failure(self, bounce_broker, security_protocol, broker_type): +@cluster(num_nodes=8) +@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.zk]) +@cluster(num_nodes=7) +@matrix(bounce_broker=[False, True], broker_type=["leader", "follower"], security_protocol=["PLAINTEXT"], metadata_quorum=[quorum.combined_kraft]) Review Comment: I agree with dropping the combined test here. I'm not sure I would ever suggest running JBOD in combined mode. A separate process for broker and controller on the same node would be better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512985080 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -884,11 +886,13 @@ public void shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() { processor.checkAndClearProcessResult(); -// push one item to the first stream; this should produce one full-join item +// push one item to the first stream; +// this should produce one inner-join item; +// and a right-joined item for a3 Review Comment: Removed the line in the comment that says we produce output for a3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512983679 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java: ## @@ -438,13 +438,13 @@ public void testOrdering() { inputTopic1.pipeInput(1, "A1", 100L); processor.checkAndClearProcessResult(); -// push one item to the other window that has a join; this should produce non-joined records with a closed window first, then -// the joined records -// by the time they were produced before +// push one item to the other window that has a join; +// this should produce the joined record first; +// then the not-joined record Review Comment: Modified the comments according to the results produced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512982523 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { Review Comment: Correct, removed this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512981646 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items Review Comment: modified comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
VictorvandenHoven commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1512980857 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java: ## @@ -436,6 +436,239 @@ public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() { } } +@Test +public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() { +final StreamsBuilder builder = new StreamsBuilder(); + +final int[] expectedKeys = new int[] {0, 1, 2, 3}; + +final KStream stream1; +final KStream stream2; +final KStream joined; +final MockApiProcessorSupplier supplier = new MockApiProcessorSupplier<>(); +stream1 = builder.stream(topic1, consumed); +stream2 = builder.stream(topic2, consumed); + +joined = stream1.leftJoin( +stream2, +MockValueJoiner.TOSTRING_JOINER, +JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO), +StreamJoined.with(Serdes.Integer(), +Serdes.String(), +Serdes.String()) +); +joined.process(supplier); + +final Collection> copartitionGroups = + TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); + +assertEquals(1, copartitionGroups.size()); +assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + +try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), PROPS)) { +final TestInputTopic inputTopic1 = +driver.createInputTopic(topic1, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final TestInputTopic inputTopic2 = +driver.createInputTopic(topic2, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO); +final MockApiProcessor processor = supplier.theCapturedProcessor(); + +processor.init(null); +// push four items with increasing timestamps to the primary stream; this should emit null-joined items +// w1 = {} +// w2 = {} +// --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002), 3:B3 (ts: 1003) } Review Comment: Changed B into A -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
KevinZTW commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512973940 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.log4j.LogManager; Review Comment: thanks! I should double check about this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: parameterize group-id in GroupMetadataManagerTestContext [kafka]
chia7712 merged PR #15467: URL: https://github.com/apache/kafka/pull/15467 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16319) Wrong broker destinations for DeleteRecords requests when more than one topic is involved and the topics/partitions are led by different brokers
[ https://issues.apache.org/jira/browse/KAFKA-16319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823653#comment-17823653 ] Andrew Schofield commented on KAFKA-16319: -- [~alexeyasf] How do you get it to do that? Do you have a small test program and instructions for setting up the cluster to make it happen? I'm sure I can fix it if only I can make it fail :) For one thing, I would add a test that fails without the fix and then succeeds when the code has been fixed so having a pointer would be helpful. > Wrong broker destinations for DeleteRecords requests when more than one topic > is involved and the topics/partitions are led by different brokers > > > Key: KAFKA-16319 > URL: https://issues.apache.org/jira/browse/KAFKA-16319 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: AlexeyASF >Assignee: Andrew Schofield >Priority: Major > > h2. Context > Kafka streams applications send, time after time, {{DeleteRecords}} requests, > via > {{org.apache.kafka.streams.processor.internals.TaskManager#maybePurgeCommittedRecords}} > method. Such requests may involve more than 1 topic (or partition), and such > requests are supposed to be sent to partitions' leaders brokers. > > h2. Observed behaviour > In case when {{DeleteRecords}} request includes more than one topic (let's > say 2 - {{topic1}} and {{{}topic2{}}}), and these topics are led by different > brokers (let’s say {{broker1}} and {{broker2}} respectively), the request is > sent to only one broker (let’s say {{{}broker1{}}}), leading to partial > not_leader_or_follower errors. As not the whole request was successful > ({{{}topic1{}}} is fine, but {{topic2}} is not), it gets retried, with the > _same_ arguments, to the _same_ broker ({{{}broker1{}}}), meaning the > response will be partially faulty again and again. It also may (and does) > happen that there is a “mirrored” half-faulty request - in this case, to > {{{}broker2{}}}, where {{topic2}} operation is successful, but {{topic1}} > operation fails. > Here’s an anonymised logs example from a production system (“direct” and > “mirrored” requests, one after another): > {code:java} > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker1:PORT (id: 2 rack: RACK1). // <-- Note the broker, it's broker1 > correlationId=42003907, timeoutMs=3 > [AdminClient clientId=worker-admin] > Sending DeleteRecordsRequestData(topics=[ > DeleteRecordsTopic( > name='topic1', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=88017574)] > ), > DeleteRecordsTopic( > name='topic2', > partitions=[DeleteRecordsPartition(partitionIndex=5, offset=243841)] > )], timeoutMs=6) > to broker2:9098 (id: 4 rack: RACK2). // <-- Note the broker, here it's broker2 > correlationId=42003906, timeoutMs=3 {code} > Such request results in the following response (in this case, only for the > "direct" response): > {code:java} > [AdminClient clientId=worker-admin] > Call( > callName=deleteRecords(api=DELETE_RECORDS), > deadlineMs=..., > tries=..., // Can be hundreds > nextAllowedTryMs=...) > got response DeleteRecordsResponseData( > throttleTimeMs=0, > topics=[ > DeleteRecordsTopicResult( > name='topic2', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=-1, errorCode=6)]), // <-- Note the > errorCode 6, which is not_leader_or_follower > DeleteRecordsTopicResult( > name='topic1', > partitions=[DeleteRecordsPartitionResult( > partitionIndex=5, lowWatermark=..., errorCode=0)]) // <-- Note the > errorCode 0, which means the operation was successful > ] > ) {code} > h2. Expected behaviour > {{DeleteRecords}} requests are sent to corresponding partitions' leaders > brokers when more than 1 topic/partition is involved and they are led by > different brokers. > h2. Notes > * {_}presumably{_}, introduced in 3.6.1 via > [https://github.com/apache/kafka/pull/13760] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: parameterize group-id in GroupMetadataManagerTestContext [kafka]
dongnuo123 commented on code in PR #15467: URL: https://github.com/apache/kafka/pull/15467#discussion_r1512942687 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -901,7 +901,7 @@ public RebalanceResult staticMembersJoinAndRebalance( public PendingMemberGroupResult setupGroupWithPendingMember(ClassicGroup group) throws Exception { // Add the first member JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() -.withGroupId("group-id") +.withGroupId(group.groupId()) Review Comment: The group id is not a parameter of the method, so I guess it's fine to use `"group-id` ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java: ## @@ -901,7 +901,7 @@ public RebalanceResult staticMembersJoinAndRebalance( public PendingMemberGroupResult setupGroupWithPendingMember(ClassicGroup group) throws Exception { // Add the first member JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() -.withGroupId("group-id") +.withGroupId(group.groupId()) Review Comment: The group id is not a parameter of the method, so I guess it's fine to use `"group-id"` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
chia7712 commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512938465 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: > And I think it's because their classpath set to the sourceSets.main.runtimeClasspath so instead of close the logger directly I would leverage this to eliminate unexpected log info pardon me, why this works? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14679) Add new __consumer_offsets records
[ https://issues.apache.org/jira/browse/KAFKA-14679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823645#comment-17823645 ] Aratz commented on KAFKA-14679: --- Okay found it, I think it is this one: [https://github.com/apache/kafka/pull/13203|http://example.com] > Add new __consumer_offsets records > -- > > Key: KAFKA-14679 > URL: https://issues.apache.org/jira/browse/KAFKA-14679 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14679) Add new __consumer_offsets records
[ https://issues.apache.org/jira/browse/KAFKA-14679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17823644#comment-17823644 ] Aratz commented on KAFKA-14679: --- Hello, Is there any Github PR related to this *resolved* work? > Add new __consumer_offsets records > -- > > Key: KAFKA-14679 > URL: https://issues.apache.org/jira/browse/KAFKA-14679 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
chia7712 commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512899702 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -30,6 +30,7 @@ import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.log4j.LogManager; Review Comment: this change is unnecessary, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
KevinZTW commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512897988 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: After executing those tasks, I think they don't have similar issue and the logger implementation couldn't be loaded ``` SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation ``` And I think it's because their classpath set to the `sourceSets.main.runtimeClasspath` so instead of close the logger directly I would leverage this to eliminate unexpected log info -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
KevinZTW commented on PR #15473: URL: https://github.com/apache/kafka/pull/15473#issuecomment-1978869931 > Could you run the website with the change and paste the screenshot? Thanks. Sure! ## Previous Version ![image](https://github.com/apache/kafka/assets/38662781/6b390918-0d42-4b54-85f2-083620d2398e) ![image](https://github.com/apache/kafka/assets/38662781/08f57b13-3fc5-461c-aa89-26ab6aab7afa) ![image](https://github.com/apache/kafka/assets/38662781/e97b9050-b9d0-4380-94f6-82ca5618d2d0) ## Now ![image](https://github.com/apache/kafka/assets/38662781/ec6e2ff9-fe26-4d36-afa1-493b35102c40) ![image](https://github.com/apache/kafka/assets/38662781/6f3f5348-9e14-4c75-a592-c409781718d3) ![image](https://github.com/apache/kafka/assets/38662781/dcf2ae81-9a97-4ac5-81da-ec736186f554) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compredded type [kafka]
johnnychhsu commented on PR #15476: URL: https://github.com/apache/kafka/pull/15476#issuecomment-1978864287 thanks for the quick review @chia7712 , sure let me address that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16341: fix the LogValidator for non-compredded type [kafka]
chia7712 commented on PR #15476: URL: https://github.com/apache/kafka/pull/15476#issuecomment-1978858961 The fix is perfect. Please rewrite the test according to #15474. Putting all test cases together is more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16341: fix the LogValidator for non-compredded type [kafka]
johnnychhsu opened a new pull request, #15476: URL: https://github.com/apache/kafka/pull/15476 ## Context Previously in the LogValidator, the `offsetOfMaxTimestamp` depends on two parameter check 1. timestampType 2. batch.toMagic If the `timestampType` is `LOG_APPEND_TIME`, and the `toMagic` is larger or equals to `MAGIC_VALUE_V2`, then we assign `offsetCounter.value - 1` to it, otherwise keep the initial value. However, in [KAFKA-14477](https://issues.apache.org/jira/browse/KAFKA-14477), this was changed, which modify `offsetOfMaxTimestamp` no matter whether it's `LOG_APPEND_TIME` or not, but only verify the `toMagic`, which led to this error. More details can be found in [KAFKA-16310](https://issues.apache.org/jira/browse/KAFKA-16310) ## Solution Fix the verifying logic and consider both the `timestampType` and magic version. ## Test run `./gradlew clean core:test --tests integration.kafka.api.OffsetOfMaxTimestampTest.testWithNoCompression` and it passed ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512790426 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping Review Comment: You're right! Tests added. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16234: Log directory failure re-creates partitions in another logdir automatically [kafka]
soarez commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1512748010 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ## @@ -289,13 +289,10 @@ class BrokerMetadataPublisher( try { // Start log manager, which will perform (potentially lengthy) // recovery-from-unclean-shutdown if required. - logManager.startup(metadataCache.getAllTopics()) - - // Delete partition directories which we're not supposed to have. We have - // to do this before starting ReplicaManager, so that the stray replicas - // don't block creation of new ones with different IDs but the same names. - // See KAFKA-14616 for details. - logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics()) + logManager.startup( +metadataCache.getAllTopics(), +shouldBeStrayKraftLog = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) Review Comment: A suggestion, due to the following concerns: * LogManager shouldn't handle metadata records, these types shouldn't depend on each other. The metadata records should be handled here instead. * The argument name looks a bit strange, namely the 'should' and 'kraft' parts. ```suggestion isStray = (topicId, partition) => Option(newImage.topics().getPartition(topicId.getOrElse{ throw new RuntimeException(s"Partition $partition does not have a topic ID, " + "which is not allowed when running in KRaft mode.") }, partition.partition())).exists(_.replicas.contains(brokerId)) ``` Perhaps LogManager should declare a type for this argument since it's propagated down the call stack several levels? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File], } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) { addStrayLog(topicPartition, log) warn(s"Loaded stray log: $logDir") +} else if (shouldBeStrayKraftLog(log)) { + // Mark the partition directories we're not supposed to have as stray. We have to do this + // during log load because topics may have been recreated with the same name while a disk + // was offline. + // See KAFKA-16234, KAFKA-16157 and KAFKA-14616 for details. Review Comment: Perhaps it could help a future reader to clarify that kraft mode (as opposed to zk) does not track deleted topics nor prevent them from being re-created with the same name before every replica has been deleted, and so there's no way for a broker with a to-be-deleted replica in an offline directory to detect this earlier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
showuon commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512747215 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: Nice catch @KevinZTW ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
chia7712 commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512687959 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: Do other doc tasks have similar issue? for example: `genSourceConnectorConfigDocs`, `genSinkConnectorConfigDocs`, etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
KevinZTW commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512669605 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: Since the log information is generated by the Metric class, another way I found is to add `log4j.logger.org.apache.kafka.common.metrics.Metrics=WARN` to the `/kafka/connect/runtime/src/test/resources/log4j.properties`. However, this way also affects the log behavior during the test execution so I think only close the log in main function is a better way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
KevinZTW commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512665815 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: In the current design, the way we generate our documentation file is executing the `main()` in above 434 lines and took everything from the stdout as a file. ```gradle task genConsumerMetricsDocs(type: JavaExec) { classpath = sourceSets.test.runtimeClasspath mainClass = 'org.apache.kafka.clients.consumer.internals.ConsumerMetrics' if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "consumer_metrics.html").newOutputStream() } ``` This caused all log information also be added into our docs as you can see here https://kafka.apache.org/documentation.html#connect_monitoring ![image](https://github.com/apache/kafka/assets/38662781/d9ec6023-6b0c-4165-b6f8-59cb039669d8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]
bachmanity1 commented on PR #15475: URL: https://github.com/apache/kafka/pull/15475#issuecomment-1978544182 @kirktrue @mimaison can you have a look, please? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16345: optionally urlencode clientId and clientSecret in authorization header [kafka]
bachmanity1 opened a new pull request, #15475: URL: https://github.com/apache/kafka/pull/15475 When a client communicates with OIDC provider to retrieve an access token RFC-6749 says that clientID and clientSecret must be urlencoded in the authorization header. (see https://tools.ietf.org/html/rfc6749#section-2.3.1) However, it seems that in practice some OIDC providers do not enforce this, so I'm proposing to introduce a new configuration parameter that will optionally urlencode clientId & clientSecret in the authorization header. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
chia7712 commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512657775 ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; -else -shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp; -return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp); +// For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping Review Comment: It seems to me this also fix the path of nonexistent magic code (`convertAndAssignOffsetsNonCompressed`). Is it possible to add test for that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16345) Optionally allow urlencoding clientId and clientSecret in authorization header
Nelson B. created KAFKA-16345: - Summary: Optionally allow urlencoding clientId and clientSecret in authorization header Key: KAFKA-16345 URL: https://issues.apache.org/jira/browse/KAFKA-16345 Project: Kafka Issue Type: Bug Reporter: Nelson B. When a client communicates with OIDC provider to retrieve an access token RFC-6749 says that clientID and clientSecret must be urlencoded in the authorization header. (see [https://tools.ietf.org/html/rfc6749#section-2.3.1)] However, it seems that in practice some OIDC providers do not enforce this, so I was thinking about introducing a new configuration parameter that will optionally urlencode clientId & clientSecret in the authorization header. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16345) Optionally allow urlencoding clientId and clientSecret in authorization header
[ https://issues.apache.org/jira/browse/KAFKA-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nelson B. reassigned KAFKA-16345: - Assignee: Nelson B. > Optionally allow urlencoding clientId and clientSecret in authorization header > -- > > Key: KAFKA-16345 > URL: https://issues.apache.org/jira/browse/KAFKA-16345 > Project: Kafka > Issue Type: Bug >Reporter: Nelson B. >Assignee: Nelson B. >Priority: Minor > > When a client communicates with OIDC provider to retrieve an access token > RFC-6749 says that clientID and clientSecret must be urlencoded in the > authorization header. (see > [https://tools.ietf.org/html/rfc6749#section-2.3.1)] However, it seems that > in practice some OIDC providers do not enforce this, so I was thinking about > introducing a new configuration parameter that will optionally urlencode > clientId & clientSecret in the authorization header. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16341) Fix un-compressed records
[ https://issues.apache.org/jira/browse/KAFKA-16341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Johnny Hsu reassigned KAFKA-16341: -- Assignee: Johnny Hsu > Fix un-compressed records > - > > Key: KAFKA-16341 > URL: https://issues.apache.org/jira/browse/KAFKA-16341 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Johnny Hsu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16252: Fix the documentation and adjust the format [kafka]
chia7712 commented on code in PR #15473: URL: https://github.com/apache/kafka/pull/15473#discussion_r1512617796 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java: ## @@ -431,6 +432,7 @@ public interface LiteralSupplier { * @param args the arguments */ public static void main(String[] args) { +LogManager.shutdown(); Review Comment: why we need this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on code in PR #15365: URL: https://github.com/apache/kafka/pull/15365#discussion_r1512610273 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import kafka.admin.ConsumerGroupCommand; +import kafka.api.AbstractSaslTest; +import kafka.api.Both$; +import kafka.utils.JaasTestUtils; +import kafka.utils.TestUtils; +import kafka.zk.ConfigEntityChangeNotificationZNode; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.function.Executable; +import scala.Option; +import scala.Some$; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.immutable.Map$; + +import java.io.File; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { +private static final String TOPIC = "topic"; +public static final int NUM_PARTITIONS = 1; +public static final int BROKER_COUNT = 1; +public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256"; +private static final Seq KAFKA_SERVER_SASL_MECHANISMS = seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM)); + +@SuppressWarnings({"deprecation"}) +private Consumer createConsumer() { +return createConsumer( +new ByteArrayDeserializer(), +new ByteArrayDeserializer(), +new Properties(), +JavaConverters.asScalaSet(Collections.emptySet()).toList() +); +} + +@Override +public SecurityProtocol securityProtocol() { +return SecurityProtocol.SASL_PLAINTEXT; +} + +@Override +public Option serverSaslProperties() { +return Some$.MODULE$.apply(kafkaServerSaslProperties(KAFKA_SERVER_SASL_MECHANISMS, KAFKA_CLIENT_SASL_MECHANISM)); +} + +@Override +public Option clientSaslProperties() { +return Some$.MODULE$.apply(kafkaClientSaslProperties(KAFKA_CLIENT_SASL_MECHANISM, false)); +} + +@Override +public int brokerCount() { +return 1; +} + +@Override +public void configureSecurityBeforeServersStart(TestInfo testInfo) { +super.configureSecurityBeforeServersStart(testInfo); + zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path()); +// Create broker credentials before starting brokers +createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); +} + +@Override +public Admin createPrivilegedAdminClient() { +return createAdminClient(bootstrapServers(listenerName()), securityProtocol(), trustStoreFile(), clientSaslProperties(), +KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); +} + +@BeforeEach +@Override +public void setUp(TestInfo testInfo) { +startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$, +JaasTestUtils.KafkaServerContextName())); +super.setUp(testInfo); +createTopic( +TOPIC, +NUM_PARTITIONS, +BROKER_COUNT, +new Properties(), +listenerName(), +new Properties());
Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on code in PR #15365: URL: https://github.com/apache/kafka/pull/15365#discussion_r1512609597 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import kafka.admin.ConsumerGroupCommand; +import kafka.api.AbstractSaslTest; +import kafka.api.Both$; +import kafka.utils.JaasTestUtils; +import kafka.utils.TestUtils; +import kafka.zk.ConfigEntityChangeNotificationZNode; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.function.Executable; +import scala.Option; +import scala.Some$; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.immutable.Map$; + +import java.io.File; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { +private static final String TOPIC = "topic"; +public static final int NUM_PARTITIONS = 1; +public static final int BROKER_COUNT = 1; +public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256"; +private static final Seq KAFKA_SERVER_SASL_MECHANISMS = seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM)); + +@SuppressWarnings({"deprecation"}) +private Consumer createConsumer() { +return createConsumer( +new ByteArrayDeserializer(), +new ByteArrayDeserializer(), +new Properties(), +JavaConverters.asScalaSet(Collections.emptySet()).toList() +); +} + +@Override +public SecurityProtocol securityProtocol() { +return SecurityProtocol.SASL_PLAINTEXT; +} + +@Override +public Option serverSaslProperties() { +return Some$.MODULE$.apply(kafkaServerSaslProperties(KAFKA_SERVER_SASL_MECHANISMS, KAFKA_CLIENT_SASL_MECHANISM)); +} + +@Override +public Option clientSaslProperties() { +return Some$.MODULE$.apply(kafkaClientSaslProperties(KAFKA_CLIENT_SASL_MECHANISM, false)); +} + +@Override +public int brokerCount() { +return 1; +} + +@Override +public void configureSecurityBeforeServersStart(TestInfo testInfo) { +super.configureSecurityBeforeServersStart(testInfo); + zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path()); +// Create broker credentials before starting brokers +createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); +} + +@Override +public Admin createPrivilegedAdminClient() { +return createAdminClient(bootstrapServers(listenerName()), securityProtocol(), trustStoreFile(), clientSaslProperties(), +KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); +} + +@BeforeEach +@Override +public void setUp(TestInfo testInfo) { +startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$, +JaasTestUtils.KafkaServerContextName())); +super.setUp(testInfo); +createTopic( +TOPIC, +NUM_PARTITIONS, +BROKER_COUNT, +new Properties(), +listenerName(), +new Properties());
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512607158 ## tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java: ## @@ -333,7 +382,7 @@ private void assertExitCodeIsOne(String... args) { } private List expectedOffsetsWithInternal() { -List consOffsets = IntStream.range(0, offsetTopicPartitionCount + 1) +List consOffsets = IntStream.range(0, offsetTopicPartitionCount) Review Comment: This is a side fix for this test. Before this PR, the `offsetTopicPartitionCount` doesn't feed into cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on PR #15474: URL: https://github.com/apache/kafka/pull/15474#issuecomment-1978460395 @chia7712 @ijuma @hachikuji , please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512604852 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ## @@ -379,8 +381,11 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { -if (record.timestamp() > maxTimestamp) +if (record.timestamp() > maxTimestamp) { maxTimestamp = record.timestamp(); +// The offset is only increased when it is a valid record +offsetOfMaxTimestamp = initialOffset + validatedRecords.size(); Review Comment: Also set the correct offset of MaxTimestamp while records traversing. ## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { -long shallowOffsetOfMaxTimestamp; -// Use the last offset when dealing with record batches -if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) -shallowOffsetOfMaxTimestamp = lastOffset; Review Comment: I don't understand why we should always set to the last offset here. This will fail the getOffsetByMaxTimestamp test. Is that expected? Maybe @ijuma could answer this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589 [3/4] Tests of ConsoleGroupCommand rewritten in java [kafka]
chia7712 commented on code in PR #15365: URL: https://github.com/apache/kafka/pull/15365#discussion_r1512597183 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/SaslClientsWithInvalidCredentialsTest.java: ## @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer.group; + +import kafka.admin.ConsumerGroupCommand; +import kafka.api.AbstractSaslTest; +import kafka.api.Both$; +import kafka.utils.JaasTestUtils; +import kafka.utils.TestUtils; +import kafka.zk.ConfigEntityChangeNotificationZNode; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.errors.SaslAuthenticationException; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.function.Executable; +import scala.Option; +import scala.Some$; +import scala.collection.JavaConverters; +import scala.collection.Seq; +import scala.collection.immutable.Map$; + +import java.io.File; +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTest.seq; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest { +private static final String TOPIC = "topic"; +public static final int NUM_PARTITIONS = 1; +public static final int BROKER_COUNT = 1; +public static final String KAFKA_CLIENT_SASL_MECHANISM = "SCRAM-SHA-256"; +private static final Seq KAFKA_SERVER_SASL_MECHANISMS = seq(Collections.singletonList(KAFKA_CLIENT_SASL_MECHANISM)); + +@SuppressWarnings({"deprecation"}) +private Consumer createConsumer() { +return createConsumer( +new ByteArrayDeserializer(), +new ByteArrayDeserializer(), +new Properties(), +JavaConverters.asScalaSet(Collections.emptySet()).toList() +); +} + +@Override +public SecurityProtocol securityProtocol() { +return SecurityProtocol.SASL_PLAINTEXT; +} + +@Override +public Option serverSaslProperties() { +return Some$.MODULE$.apply(kafkaServerSaslProperties(KAFKA_SERVER_SASL_MECHANISMS, KAFKA_CLIENT_SASL_MECHANISM)); +} + +@Override +public Option clientSaslProperties() { +return Some$.MODULE$.apply(kafkaClientSaslProperties(KAFKA_CLIENT_SASL_MECHANISM, false)); +} + +@Override +public int brokerCount() { +return 1; +} + +@Override +public void configureSecurityBeforeServersStart(TestInfo testInfo) { +super.configureSecurityBeforeServersStart(testInfo); + zkClient().makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path()); +// Create broker credentials before starting brokers +createScramCredentials(zkConnect(), JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); +} + +@Override +public Admin createPrivilegedAdminClient() { +return createAdminClient(bootstrapServers(listenerName()), securityProtocol(), trustStoreFile(), clientSaslProperties(), +KAFKA_CLIENT_SASL_MECHANISM, JaasTestUtils.KafkaScramAdmin(), JaasTestUtils.KafkaScramAdminPassword()); +} + +@BeforeEach +@Override +public void setUp(TestInfo testInfo) { +startSasl(jaasSections(KAFKA_SERVER_SASL_MECHANISMS, Some$.MODULE$.apply(KAFKA_CLIENT_SASL_MECHANISM), Both$.MODULE$, +JaasTestUtils.KafkaServerContextName())); +super.setUp(testInfo); +createTopic( +TOPIC, +NUM_PARTITIONS, +BROKER_COUNT, +new Properties(), +listenerName(), +new Properties());
[PR] KAFKA-16342: fix getOffsetByMaxTimestamp for compressed records [kafka]
showuon opened a new pull request, #15474: URL: https://github.com/apache/kafka/pull/15474 Fix `getOffsetByMaxTimestamp` for compressed records. This PR adds: 1. For inPlaceAssignment case, compute the correct offset for maxTimestamp when traversing the batch records, and set to `ValidationResult` in the end, instead of setting to last offset always. 2. For not inPlaceAssignment, set the offsetOfMaxTimestamp for the log create time, like non-compressed, and inPlaceAssignment cases, instead of setting to last offset always. 3. Add tests to verify the fix. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org