[GitHub] [kafka] showuon commented on pull request #13100: MINOR: add size check for tagged fields
showuon commented on PR #13100: URL: https://github.com/apache/kafka/pull/13100#issuecomment-1376856241 @ijuma @mimaison , please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13100: MINOR: add size check for tagged fields
showuon opened a new pull request, #13100: URL: https://github.com/apache/kafka/pull/13100 Add size check for taggedFields of a tag, and add tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14279) Add 3.3.1 to broker/client and stream upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-14279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-14279. - Resolution: Fixed > Add 3.3.1 to broker/client and stream upgrade/compatibility tests > - > > Key: KAFKA-14279 > URL: https://issues.apache.org/jira/browse/KAFKA-14279 > Project: Kafka > Issue Type: Task > Components: clients, core, streams, system tests >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Blocker > Fix For: 3.4.0 > > > Per the penultimate bullet on the [release > checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses], > Kafka v3.3.0 is released. We should add this version to the system tests. > Example PRs: > * Broker and clients: [https://github.com/apache/kafka/pull/6794] > * Streams: [https://github.com/apache/kafka/pull/6597/files] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14279) Add 3.3.1 to broker/client and stream upgrade/compatibility tests
[ https://issues.apache.org/jira/browse/KAFKA-14279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-14279: Fix Version/s: 3.4.0 (was: 3.5.0) > Add 3.3.1 to broker/client and stream upgrade/compatibility tests > - > > Key: KAFKA-14279 > URL: https://issues.apache.org/jira/browse/KAFKA-14279 > Project: Kafka > Issue Type: Task > Components: clients, core, streams, system tests >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Blocker > Fix For: 3.4.0 > > > Per the penultimate bullet on the [release > checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses], > Kafka v3.3.0 is released. We should add this version to the system tests. > Example PRs: > * Broker and clients: [https://github.com/apache/kafka/pull/6794] > * Streams: [https://github.com/apache/kafka/pull/6597/files] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on pull request #13077: KAFKA-14279; Add 3.3.x streams system tests
mjsax commented on PR #13077: URL: https://github.com/apache/kafka/pull/13077#issuecomment-1376846757 Thanks for the PR. Merged to `trunk` and cherry-picked to `3.4` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13077: KAFKA-14279; Add 3.3.x streams system tests
mjsax merged PR #13077: URL: https://github.com/apache/kafka/pull/13077 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14126) Convert remaining DynamicBrokerReconfigurationTest tests to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-14126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656475#comment-17656475 ] Matthias J. Sax commented on KAFKA-14126: - Saw this failing 2x on this PR: https://github.com/apache/kafka/pull/13077 > Convert remaining DynamicBrokerReconfigurationTest tests to KRaft > - > > Key: KAFKA-14126 > URL: https://issues.apache.org/jira/browse/KAFKA-14126 > Project: Kafka > Issue Type: Test >Reporter: David Arthur >Assignee: David Arthur >Priority: Major > > After the initial conversion in https://github.com/apache/kafka/pull/12455, > three tests still need to be converted. > * testKeyStoreAlter > * testTrustStoreAlter > * testThreadPoolResize -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] kamalcph commented on pull request #13060: KAFKA-14559: Fix JMX tool to handle the object names with wild cards and optional attributes
kamalcph commented on PR #13060: URL: https://github.com/apache/kafka/pull/13060#issuecomment-1376779303 @showuon @mimaison ping for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #13059: MINOR: KafkaConfig should not expose internal config when queried for non-internal values
kamalcph commented on PR #13059: URL: https://github.com/apache/kafka/pull/13059#issuecomment-1376779075 @showuon @mimaison ping for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation
showuon commented on PR #13099: URL: https://github.com/apache/kafka/pull/13099#issuecomment-1376733175 @rondagostino , please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation
showuon commented on code in PR #13099: URL: https://github.com/apache/kafka/pull/13099#discussion_r1065331903 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1496,4 +1496,32 @@ public static String replaceSuffix(String str, String oldSuffix, String newSuffi throw new IllegalArgumentException("Expected string to end with " + oldSuffix + " but string is " + str); return str.substring(0, str.length() - oldSuffix.length()) + newSuffix; } + +public static long zeroIfNegative(long value) { +return Math.max(0L, value); +} + +// returns the sum of a and b unless it would overflow, which will return Long.MAX_VALUE +public static long saturatedAdd(long a, long b) { Review Comment: The method name is copied from guava's [LongMath class](https://guava.dev/releases/20.0/api/docs/com/google/common/math/LongMath.html#saturatedAdd-long-long-). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #13099: KAFKA-14604: avoid SASL session expiration time overflowed when calculation
showuon opened a new pull request, #13099: URL: https://github.com/apache/kafka/pull/13099 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14604) SASL session expiration time will be overflowed when calculation
[ https://issues.apache.org/jira/browse/KAFKA-14604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-14604: -- Description: When sasl server of client set a large expiration time, the timeout value might be overflowed, and cause the session timeout immediately. [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s the sasl server timeout's calculation [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s the sasl client timeout's calculation something like this: {code:java} sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMs; {code} So, if the configured or returned sessionLifetimeMs is a large number, after the calculation, the `sessionExpirationTimeNanos` will be a negative value, and cause the session timeout on each check. was: When sasl server of client set a large expiration time, the timeout value might be overflowed, and cause the session timeout immediately. [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s the sasl server timeout's calculation [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s the sasl client timeout's calculation something like this: {code:java} sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMs; {code} So, if the configured or returned sessionLifetimeMs is a large number, after the calculation, the `sessionExpirationTimeNanos` will be a negative value, and cause the session timeout each check. > SASL session expiration time will be overflowed when calculation > > > Key: KAFKA-14604 > URL: https://issues.apache.org/jira/browse/KAFKA-14604 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > When sasl server of client set a large expiration time, the timeout value > might be overflowed, and cause the session timeout immediately. > > [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java#L694]'s > the sasl server timeout's calculation > [Here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java#L692]'s > the sasl client timeout's calculation > > something like this: > {code:java} > sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * > sessionLifetimeMs; {code} > So, if the configured or returned sessionLifetimeMs is a large number, after > the calculation, the `sessionExpirationTimeNanos` will be a negative value, > and cause the session timeout on each check. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14608: - Description: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we get to be under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I don't think this is a severe issue. Either we were already under min ISR so the reassignment does not actually *cause* under min ISR. Or we're decreasing the replication factor which isn't a common scenario. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. was: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we are under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I dont think this is a severe issue. Either we were already under min ISR so the reassignment does not *cause* under min ISR. Or we're decreasing the replication factor which isnt a common scenario. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. > Look into when reassignment should be completed in KRaft mode > - > > Key: KAFKA-14608 > URL: https://issues.apache.org/jira/browse/KAFKA-14608 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In KRaft mode we complete reassignments when the adding replicas have been > added to the ISR - see > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] > As a result its possible for the partition to be under min ISR if the number > of adding replicas is less than the topic's under min ISR config and some > other target replicas are not in the ISR for whatever reason. > This behavior differs to ZK mode where we require all target replicas to be > in the ISR for the reassignment to complete - see > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. > > I thought more about it and I only think the reassignment can be completed > *and* we get to be under min ISR if either 1) we were already under min ISR > in the first place or 2) the replication factor decreases. So in practice I > don't think this is a severe issue. Either we were already under min ISR so > the reassignment does not actually *cause* under min ISR. Or we're decreasing > the replication factor which isn't a common scenario. > It seems there are two options. One is to match the ZK behavior and only > complete reassignments when all target replicas are in the ISR. The second is > to com
[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14608: - Description: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we get to be under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I don't think this is a severe issue. Either we were already under min ISR so the reassignment does not actually *cause* under min ISR. Or we're decreasing the replication factor which isn't a common scenario. It seems there are three options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. The third is to leave the current behavior. was: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we get to be under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I don't think this is a severe issue. Either we were already under min ISR so the reassignment does not actually *cause* under min ISR. Or we're decreasing the replication factor which isn't a common scenario. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. > Look into when reassignment should be completed in KRaft mode > - > > Key: KAFKA-14608 > URL: https://issues.apache.org/jira/browse/KAFKA-14608 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In KRaft mode we complete reassignments when the adding replicas have been > added to the ISR - see > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] > As a result its possible for the partition to be under min ISR if the number > of adding replicas is less than the topic's under min ISR config and some > other target replicas are not in the ISR for whatever reason. > This behavior differs to ZK mode where we require all target replicas to be > in the ISR for the reassignment to complete - see > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. > > I thought more about it and I only think the reassignment can be completed > *and* we get to be under min ISR if either 1) we were already under min ISR > in the first place or 2) the replication factor decreases. So in practice I > don't think this is a severe issue. Either we were already under min ISR so > the reassignment does not actually *cause* under min ISR. Or we're decreasing > the replication factor which isn't a common scenario. > It seems there are three options. One is to match the ZK behavior and only > complete reassignments
[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14608: - Description: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we are under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I dont think this is a severe issue. Either we were already under min ISR so the reassignment does not *cause* under min ISR. Or we're decreasing the replication factor which isnt a common scenario. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. was: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we are under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I dont think this is a severe issue. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. > Look into when reassignment should be completed in KRaft mode > - > > Key: KAFKA-14608 > URL: https://issues.apache.org/jira/browse/KAFKA-14608 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In KRaft mode we complete reassignments when the adding replicas have been > added to the ISR - see > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] > As a result its possible for the partition to be under min ISR if the number > of adding replicas is less than the topic's under min ISR config and some > other target replicas are not in the ISR for whatever reason. > This behavior differs to ZK mode where we require all target replicas to be > in the ISR for the reassignment to complete - see > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. > > I thought more about it and I only think the reassignment can be completed > *and* we are under min ISR if either 1) we were already under min ISR in the > first place or 2) the replication factor decreases. So in practice I dont > think this is a severe issue. Either we were already under min ISR so the > reassignment does not *cause* under min ISR. Or we're decreasing the > replication factor which isnt a common scenario. > It seems there are two options. One is to match the ZK behavior and only > complete reassignments when all target replicas are in the ISR. The second is > to complete reassignments when enough target replicas are in the ISR such > that we're above under min ISR. So if the under min ISR config for a topic is > two, then we would complete reassignments when
[jira] [Updated] (KAFKA-14608) Make sure reassignment does not cause under min ISR
[ https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14608: - Description: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. I thought more about it and I only think the reassignment can be completed *and* we are under min ISR if either 1) we were already under min ISR in the first place or 2) the replication factor decreases. So in practice I dont think this is a severe issue. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. was: In KRaft mode we complete reassignments when the adding replicas have been added to the ISR - see [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] As a result its possible for the partition to be under min ISR if the number of adding replicas is less than the topic's under min ISR config and some other target replicas are not in the ISR for whatever reason. This behavior differs to ZK mode where we require all target replicas to be in the ISR for the reassignment to complete - see [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. It seems there are two options. One is to match the ZK behavior and only complete reassignments when all target replicas are in the ISR. The second is to complete reassignments when enough target replicas are in the ISR such that we're above under min ISR. So if the under min ISR config for a topic is two, then we would complete reassignments when at least two target replicas are in the ISR. > Make sure reassignment does not cause under min ISR > --- > > Key: KAFKA-14608 > URL: https://issues.apache.org/jira/browse/KAFKA-14608 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In KRaft mode we complete reassignments when the adding replicas have been > added to the ISR - see > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] > As a result its possible for the partition to be under min ISR if the number > of adding replicas is less than the topic's under min ISR config and some > other target replicas are not in the ISR for whatever reason. > This behavior differs to ZK mode where we require all target replicas to be > in the ISR for the reassignment to complete - see > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. > > I thought more about it and I only think the reassignment can be completed > *and* we are under min ISR if either 1) we were already under min ISR in the > first place or 2) the replication factor decreases. So in practice I dont > think this is a severe issue. > It seems there are two options. One is to match the ZK behavior and only > complete reassignments when all target replicas are in the ISR. The second is > to complete reassignments when enough target replicas are in the ISR such > that we're above under min ISR. So if the under min ISR config for a topic is > two, then we would complete reassignments when at least two target replicas > are in the ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14608) Look into when reassignment should be completed in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-14608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Grant updated KAFKA-14608: - Summary: Look into when reassignment should be completed in KRaft mode (was: Make sure reassignment does not cause under min ISR) > Look into when reassignment should be completed in KRaft mode > - > > Key: KAFKA-14608 > URL: https://issues.apache.org/jira/browse/KAFKA-14608 > Project: Kafka > Issue Type: Improvement >Reporter: Andrew Grant >Assignee: Andrew Grant >Priority: Major > > In KRaft mode we complete reassignments when the adding replicas have been > added to the ISR - see > [https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java#L288.] > As a result its possible for the partition to be under min ISR if the number > of adding replicas is less than the topic's under min ISR config and some > other target replicas are not in the ISR for whatever reason. > This behavior differs to ZK mode where we require all target replicas to be > in the ISR for the reassignment to complete - see > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/KafkaController.scala#L1003]. > > I thought more about it and I only think the reassignment can be completed > *and* we are under min ISR if either 1) we were already under min ISR in the > first place or 2) the replication factor decreases. So in practice I dont > think this is a severe issue. > It seems there are two options. One is to match the ZK behavior and only > complete reassignments when all target replicas are in the ISR. The second is > to complete reassignments when enough target replicas are in the ISR such > that we're above under min ISR. So if the under min ISR config for a topic is > two, then we would complete reassignments when at least two target replicas > are in the ISR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on a diff in pull request #13058: KAFKA-14557; Lock metadata log dir
rondagostino commented on code in PR #13058: URL: https://github.com/apache/kafka/pull/13058#discussion_r1065242904 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -127,18 +127,18 @@ class RaftManagerTest { } @ParameterizedTest - @ValueSource(strings = Array("metadata", "log", "metadata,log")) + @ValueSource(strings = Array("metadata-only", "log-only", "both")) def testLogDirLockWhenControllerOnly(dirType: String): Unit = { -val logDir = if (dirType.contains("metadata")) { - Some(TestUtils.tempDir().toPath) -} else { +val logDir = if (dirType.equals("metadata-only")) { None +} else { + Some(TestUtils.tempDir().toPath) } -val metadataDir = if (dirType.contains("log")) { - Some(TestUtils.tempDir().toPath) -} else { +val metadataDir = if (dirType.contains("log-only")) { Review Comment: `s/contains/equals/` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #13077: KAFKA-14279; Add 3.3.x streams system tests
jsancio commented on PR #13077: URL: https://github.com/apache/kafka/pull/13077#issuecomment-1376537132 @mjsax I got a chance to work on this today. I updated the PR. Here is the result from running the test locally: ```bash TC_PATHS="tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces" _DUCKTAPE_OPTIONS='--parameters '\''{"from_version":"3.3.1","to_version":"3.5.0-SNAPSHOT"}'\' tests/docker/run_tests.sh > Configure project : Starting build with version 3.5.0-SNAPSHOT (commit id a0090d24) using Gradle 7.6, Java 1.8 and Scala 2.13.10 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 BUILD SUCCESSFUL in 1s 168 actionable tasks: 168 up-to-date docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file /opt/kafka-dev/tests/docker/build/cluster.json ./tests/kafkatest/tests/streams/streams_upgrade_test.py::StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces --parameters '{"from_version":"3.3.1","to_version":"3.5.0-SNAPSHOT"}'" /usr/local/lib/python3.9/dist-packages/paramiko/transport.py:236: CryptographyDeprecationWarning: Blowfish has been deprecated "class": algorithms.Blowfish, [INFO:2023-01-09 16:14:29,112]: starting test run with session id 2023-01-09--003... [INFO:2023-01-09 16:14:29,112]: running 1 tests... [INFO:2023-01-09 16:14:29,112]: Triggering test 1 of 1... [INFO:2023-01-09 16:14:29,117]: RunnerClient: Loading test {'directory': '/opt/kafka-dev/tests/kafkatest/tests/streams', 'file_name': 'streams_upgrade_test.py', 'cls_name': 'StreamsUpgradeTest', 'method_name': 'test_rolling_upgrade_with_2_bounces', 'injected_args': {'from_version': '3.3.1', 'to_version': '3.5.0-SNAPSHOT'}} [INFO:2023-01-09 16:14:29,118]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT: on run 1/1 [INFO:2023-01-09 16:14:29,119]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT: Setting up... [INFO:2023-01-09 16:14:29,119]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT: Running... [INFO:2023-01-09 16:18:09,123]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT: Tearing down... [INFO:2023-01-09 16:18:28,193]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT: PASS [INFO:2023-01-09 16:18:28,193]: RunnerClient: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT: Data: None SESSION REPORT (ALL TESTS) ducktape version: 0.11.1 session_id: 2023-01-09--003 run time: 3 minutes 59.087 seconds tests run:1 passed: 1 flaky:0 failed: 0 ignored: 0 test_id: kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest.test_rolling_upgrade_with_2_bounces.from_version=3.3.1.to_version=3.5.0-SNAPSHOT status: PASS run time: 3 minutes 59.075 seconds ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #13064: MINOR: bump streams quickstart pom versions and add to list in gradle.properties
ableegoldman merged PR #13064: URL: https://github.com/apache/kafka/pull/13064 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #13063: MINOR: Update KRaft cluster upgrade documentation for 3.4
ableegoldman merged PR #13063: URL: https://github.com/apache/kafka/pull/13063 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jolshan commented on PR #12870: URL: https://github.com/apache/kafka/pull/12870#issuecomment-1376482387 Looks fairly reasonable. I'm going to rebuild though to see if some of the tests look flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
jeffkbkim commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1065176140 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java: ## @@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs, this.error = null; } +public OffsetFetchResponse(List groups, short version) { +super(ApiKeys.OFFSET_FETCH); +data = new OffsetFetchResponseData(); + +if (version >= 8) { +data.setGroups(groups); +error = null; + +for (OffsetFetchResponseGroup group : data.groups()) { +this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode())); +} +} else { +if (groups.size() != 1) { +throw new UnsupportedVersionException( +"Version " + version + " of OffsetFetchResponse only support one group." +); +} + +OffsetFetchResponseGroup group = groups.get(0); +data.setErrorCode(group.errorCode()); +error = Errors.forCode(group.errorCode()); + +group.topics().forEach(topic -> { +OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name()); +data.topics().add(newTopic); + +topic.partitions().forEach(partition -> { +OffsetFetchResponsePartition newPartition; + +if (version < 2 && group.errorCode() != Errors.NONE.code()) { +// Versions prior to version 2 does not support a top level error. Therefore +// we put it at the partition level. +newPartition = new OffsetFetchResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +.setErrorCode(group.errorCode()); +} else { Review Comment: that makes sense. thanks for the clarification -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rishiraj88 commented on pull request #13097: Draft: only wipe state store under EOS regardless of state
rishiraj88 commented on PR #13097: URL: https://github.com/apache/kafka/pull/13097#issuecomment-1376384331 @lqxshay , thanks for for the helpful checklist. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rishiraj88 commented on pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
rishiraj88 commented on PR #12998: URL: https://github.com/apache/kafka/pull/12998#issuecomment-1376375860 Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
hachikuji commented on PR #12972: URL: https://github.com/apache/kafka/pull/12972#issuecomment-1376323364 Would it be possible to make the release status part of the JSON spec? For example: ```json "apiStability": "evolving|stable" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12998: KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller.
cmccabe merged PR #12998: URL: https://github.com/apache/kafka/pull/12998 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lqxshay opened a new pull request, #13097: Draft: only wipe state store under EOS regardless of state
lqxshay opened a new pull request, #13097: URL: https://github.com/apache/kafka/pull/13097 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on PR #13078: URL: https://github.com/apache/kafka/pull/13078#issuecomment-1376065254 [clolov](https://github.com/clolov) I see your point about people not knowing to use those methods -- maybe we can add some documentation. However, I'm wondering if changing the map will affect performance for the reading and writing of the map. I don't think it makes sense to take a hit there. We will also be reading the size very frequently for the metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
jolshan commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1064952018 ## docs/ops.html: ## @@ -1604,6 +1604,11 @@
[GitHub] [kafka] philipnee commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
philipnee commented on code in PR #13021: URL: https://github.com/apache/kafka/pull/13021#discussion_r1064949778 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.stream.Collectors; + +public class CommitRequestManager implements RequestManager { +private final Queue stagedCommits; +// TODO: We will need to refactor the subscriptionState +private final SubscriptionState subscriptionState; +private final Logger log; +private final Optional autoCommitState; +private final Optional coordinatorRequestManager; +private final GroupStateManager groupState; + +public CommitRequestManager( +final Time time, +final LogContext logContext, +final SubscriptionState subscriptionState, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final GroupStateManager groupState) { +this.log = logContext.logger(getClass()); +this.stagedCommits = new LinkedList<>(); +if (config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { +final long autoCommitInterval = + Integer.toUnsignedLong(config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG)); +this.autoCommitState = Optional.of(new AutoCommitState(time, autoCommitInterval)); +} else { +this.autoCommitState = Optional.empty(); +} +this.coordinatorRequestManager = Optional.ofNullable(coordinatorRequestManager); +this.groupState = groupState; +this.subscriptionState = subscriptionState; +} + +// Visible for testing +CommitRequestManager( +final Time time, +final LogContext logContext, +final SubscriptionState subscriptionState, +final ConsumerConfig config, +final CoordinatorRequestManager coordinatorRequestManager, +final GroupStateManager groupState, +final AutoCommitState autoCommitState) { +this.log = logContext.logger(getClass()); +this.subscriptionState = subscriptionState; +this.coordinatorRequestManager = Optional.ofNullable(coordinatorRequestManager); +this.groupState = groupState; +this.autoCommitState = Optional.ofNullable(autoCommitState); +this.stagedCommits = new LinkedList<>(); +} + +/** + * Poll for the commit request if there's any. The function will also try to autocommit, if enabled. + * + * @param currentTimeMs + * @return + */ +@Override +public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { +if (!coordinatorRequestManager.isPresent()) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); +} + +maybeAutoCommit(currentTimeMs); + +if (stagedCommits.isEmpty()) { +return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<>()); +} + +List unsentCommitRequests = + stagedCommits.stream().map(StagedCommit::toUnsent
[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
ijuma commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1376021193 We already depend on core when it comes to the tools test module, so we don't necessarily have to move things for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14609) Kafka Streams Processor API cannot use state stores
[ https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656218#comment-17656218 ] Bill Bejeck commented on KAFKA-14609: - It just received approval on the dev list, so I'd say within a week. > Kafka Streams Processor API cannot use state stores > --- > > Key: KAFKA-14609 > URL: https://issues.apache.org/jira/browse/KAFKA-14609 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Philipp Schirmer >Priority: Major > > The recently introduced Kafka Streams Processor API (since 3.3, > https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with > regards to using state stores. The > [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-] > method returns null, even though the store has been registered according to > the docs. The old transformer API still works. I created a small project that > demonstrates the behavior. It uses both methods to register a store for the > transformer, as well as the processor API: > https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14404) Fix & update docs on client configs controlled by Streams
[ https://issues.apache.org/jira/browse/KAFKA-14404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656213#comment-17656213 ] Sujay Hegde commented on KAFKA-14404: - [~ableegoldman] I need some clarification about the description. I will go through the code + docs and get back. Thanks > Fix & update docs on client configs controlled by Streams > - > > Key: KAFKA-14404 > URL: https://issues.apache.org/jira/browse/KAFKA-14404 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sujay Hegde >Priority: Major > Labels: docs, newbie > > There are a handful of client configs that can't be set by Streams users for > various reasons, such as the group id, but we seem to have missed a few of > them in the documentation > [here|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26]: > the partitioner assignor (Consumer) and partitioner (Producer). > This section of the docs also just needs to be cleaned up in general as there > is overlap between the [Default > Values|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#default-values] > and [Parameters controlled by Kafka > Streams|https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#id26] > sections, and the table of contents is messed up presumably due to an issue > with the section headers. > We should separate these with one section covering (only) configs where > Streams sets a different default but this can still be overridden by the > user, and the other section covering the configs that Streams hardcodes and > users can never override. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on pull request #13080: KAFKA-14575: Move ClusterTool to tools module
fvaleri commented on PR #13080: URL: https://github.com/apache/kafka/pull/13080#issuecomment-1375959364 LGTM. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #13084: KAFKA-14598: Fix flaky ConnectRestApiTest
gharris1727 commented on PR #13084: URL: https://github.com/apache/kafka/pull/13084#issuecomment-1375950088 Ah, I recall working on this before. I was the one that added that STARTUP_MODE_JOIN override you linked: https://github.com/apache/kafka/pull/9040 In the description, I mentioned how JOIN was a superset of LISTEN, and I think that's still the case. The jetty server starts: https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java#L207 before the herder does: https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java#L53-L54 However, as you've noticed, the registration of the resources occurs _after_ the server begins listening, and _after_ the herder joins the group. So neither LISTEN or JOIN is sufficient to ensure that the resources are registered. But changing from JOIN to LISTEN is going to have the opposite effect that you're intending, as the LISTEN condition is true even earlier than JOIN is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #13046: KAFKA-14551 Move/Rewrite LeaderEpochFileCache and its dependencies to the storage module.
satishd commented on PR #13046: URL: https://github.com/apache/kafka/pull/13046#issuecomment-1375939273 @ijuma I rebased with trunk and resolved conflicts. Please review it when you get some time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks
vamossagar12 commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1064852750 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -780,6 +774,14 @@ protected void stopServices() { } } +// Timeout for herderExecutor to gracefully terminate is set to a value to accommodate +// reading to the end of the config topic + successfully attempting to stop all connectors and tasks and a buffer of 10s +private long herderExecutorTimeoutMs() { +return this.workerSyncTimeoutMs + +config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG) + Review Comment: Nope that wasn't right. Changed it now. Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
mimaison commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375933240 Yes I can review this. I started looking at [KAFKA-14525](https://issues.apache.org/jira/browse/KAFKA-14525) because we were stepping on each others toes in [KAFKA-14470](https://issues.apache.org/jira/browse/KAFKA-14470), but we should finish that first. Many of the tests for these commands start full clusters and all that test logic is currently in core. We should be able to move it to server-common but I'm not quite sure if we want to drag many ZooKeeper bits there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya opened a new pull request, #13096: MINOR: Multiple clean ups associated with scala collection
divijvaidya opened a new pull request, #13096: URL: https://github.com/apache/kafka/pull/13096 Clean up of different aspects on the scala code. Some are potential performance improvement and some are readability improvements. Example for type of changes are: 1. Merge consecutive filter calls to avoid creation of an intermediate collection 2. Don’t resort to pattern matching to check value existence. The simplified expression works faster. 3. For `option`, don’t emulate `exists` & other `monadic` functions using pattern matching. see: https://pavelfatin.com/scala-collections-tips-and-tricks/#options-processing 4. Don’t use map when result is ignored, use `foreach` instead. 5. Using `.lengthCompare(n) > 0)` instead of `.length()` reduces the complexity from O(length) to O(length min n) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375924994 Actually i pinged too soon :) Before getting it reviewed, I would test on my local and also add a couple of tests. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…
C0urante commented on code in PR #11818: URL: https://github.com/apache/kafka/pull/11818#discussion_r1064843478 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ## @@ -69,14 +69,19 @@ public MirrorSourceTask() {} // for testing MirrorSourceTask(KafkaConsumer consumer, MirrorSourceMetrics metrics, String sourceClusterAlias, - ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer producer) { + ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer producer, + Semaphore outstandingOffsetSyncs, Map partitionStates, Review Comment: One alternative is to hardcode the instantiation of the `outstandingOffsetSyncs` semaphore in the testing-only constructor, without relying on a constructor parameter (and the same can be done for the `partitionStates` field). But that has its own problems of duplicating instantiation logic for those fields and leading to possible divergence in behavior between task instances depending on whether they're brought up for testing or not, if we're not careful about keeping that instantiation logic in sync. One way that that issue can be addressed is by pulling out any non-trivial instantiation logic into a separate static method; e.g.: ```java private static Semaphore newOutstandingOffsetSyncsSemaphore() { return new Semaphore(MAX_OUSTANDING_OFFSET_SYNCS); } ``` But I don't think that this is so much better than what you have right now that I'd block the PR on adapting this approach; even if it has some advantages, the additional complexity makes it debatable whether it'd really be worth the tradeoff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module
mimaison commented on code in PR #13080: URL: https://github.com/apache/kafka/pull/13080#discussion_r1064840727 ## tools/src/main/java/org/apache/kafka/tools/ClusterTool.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class ClusterTool { + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { Review Comment: Yes. I was thinking of doing that after the Scala to Java conversion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…
C0urante commented on code in PR #11818: URL: https://github.com/apache/kafka/pull/11818#discussion_r1064822714 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ## @@ -256,39 +277,76 @@ public void testPartitionStateMutation() { partitionStates.put(sourceTopicPartition, partitionState); RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); -when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); +doAnswer(new Answer() { +@Override +public Object answer(final InvocationOnMock invocation) { +final Callback callback = invocation.getArgument(1); +callback.onCompletion(null, null); +return null; +} +}).when(producer).send(any(), any()); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); -assertFalse(partitionState.shouldSyncOffsets); +assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); +verify(producer, times(1)).send(any(), any()); -int newRecordOffset = 2; -int newMetadataOffset = 102; -recordMetadata = new RecordMetadata(sourceTopicPartition, newMetadataOffset, 0, 0, 0, recordPartition); +recordOffset = 2; +metadataOffset = 102; +recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, -newRecordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, +recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, headers, Optional.empty())); -when(outstandingOffsetSyncs.tryAcquire()).thenReturn(false); +// Do not release outstanding sync semaphore +doAnswer(new Answer() { +@Override +public Object answer(final InvocationOnMock invocation) { +return null; +} +}).when(producer).send(any(), any()); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); -// Expect partition state to be updated -assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); -assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); -assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "sync offsets"); -assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); -assertTrue(partitionState.shouldSyncOffsets); -verify(producer, times(1)).send(any(), any()); -when(outstandingOffsetSyncs.tryAcquire()).thenReturn(true); +assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); +assertEquals(metadataOffset, partitionState.lastSyncDownstreamOffset, "sync offsets"); +assertEquals(recordOffset, partitionState.previousUpstreamOffset, "sync offsets"); +assertEquals(metadataOffset, partitionState.previousDownstreamOffset, "sync offsets"); +assertFalse(partitionState.shouldSyncOffsets, "partition state reset"); +verify(producer, times(2)).send(any(), any()); + +recordOffset = 4; +metadataOffset = 104; +recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); +sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, +recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, +recordValue.length, recordKey, recordValue, headers, Optional.empty())); + mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); -assertEquals(newRecordOffset, partitionState.lastSyncUpstreamOffset, "partition state is synced"); -assertEquals(newMetadataOffset, partitionState.lastSyncDownstreamOffset, "partition state is synced"); -assertEquals(newRecordOffset, partitionState.previousUpstreamOffset, "partition state is synced"); -assertEquals(newMetadataOffset, partitionState.previousDownstreamOffset, "partition state is synced"); -assertFalse(partitionState.shouldSyncOffsets); +assertEquals(recordOffset, partitionState.lastSyncUpstreamOffset, "sync offsets"); +
[GitHub] [kafka] dajac commented on pull request #12901: KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface
dajac commented on PR #12901: URL: https://github.com/apache/kafka/pull/12901#issuecomment-1375874051 @jolshan I have updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14609) Kafka Streams Processor API cannot use state stores
[ https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656173#comment-17656173 ] Philipp Schirmer commented on KAFKA-14609: -- Thanks, I didn't find that issue. Do you know when 3.3.2 will be released? > Kafka Streams Processor API cannot use state stores > --- > > Key: KAFKA-14609 > URL: https://issues.apache.org/jira/browse/KAFKA-14609 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Philipp Schirmer >Priority: Major > > The recently introduced Kafka Streams Processor API (since 3.3, > https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with > regards to using state stores. The > [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-] > method returns null, even though the store has been registered according to > the docs. The old transformer API still works. I created a small project that > demonstrates the behavior. It uses both methods to register a store for the > transformer, as well as the processor API: > https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14610) Publish Mirror Maker 2 offset syncs in task commit method
Chris Egerton created KAFKA-14610: - Summary: Publish Mirror Maker 2 offset syncs in task commit method Key: KAFKA-14610 URL: https://issues.apache.org/jira/browse/KAFKA-14610 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Chris Egerton Mirror Maker 2 periodically publishes offset sync messages to a Kafka topic that contains the corresponding upstream and downstream offsets for a replicated topic partition. Currently, this publishing takes place inside the [commitRecord method|https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L192], which is invoked by the Kafka Connect framework after a source record has been successfully sent by its producer (i.e., ack'd by the requested number of brokers). Mirror Maker 2 also has logic to limit the number of in-flight offset sync messages. Once ten messages have been dispatched to the producer used for offset syncs (which is a separate producer from the one that the Kafka Connect framework uses for sending records received from the [poll method|https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L134]) that have not yet been ack'd by the requested number of brokers, Mirror Maker 2 begins to skip sending offset sync messages, and will only resume sending messages once the number of in-flight offset syncs goes below 10, and new calls to the {{commitRecord}} method take place. When bursts of throughput occur in replicated topic partitions, this can cause offset syncs to be dropped for long periods of time if an offset sync is skipped for some topic partition due to a high number of in-flight messages and then no further messages are read from that same topic partition for a while. Instead, the task should cache offset syncs in its {{{}commitRecord method{}}}, and only actually send offset sync messages in its [commit method|https://github.com/apache/kafka/blob/e38526e375389868664c8977c7a2125e5da2388c/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L108], which is invoked periodically by the Kafka Connect framework. Any offset syncs that are skipped due to too many in-flight messages will then be automatically retried later when {{commit}} is re-invoked, regardless of whether any more records are read from the corresponding topic partition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
ijuma commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375847369 Also, I'm currently focused on completing KAFKA-14470. @mimaison since you fleshed out KAFKA-14525, do you have cycles to do these reviews? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14609) Kafka Streams Processor API cannot use state stores
[ https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-14609. - Resolution: Fixed Fixed by https://issues.apache.org/jira/browse/KAFKA-14388 > Kafka Streams Processor API cannot use state stores > --- > > Key: KAFKA-14609 > URL: https://issues.apache.org/jira/browse/KAFKA-14609 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Philipp Schirmer >Priority: Major > > The recently introduced Kafka Streams Processor API (since 3.3, > https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with > regards to using state stores. The > [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-] > method returns null, even though the store has been registered according to > the docs. The old transformer API still works. I created a small project that > demonstrates the behavior. It uses both methods to register a store for the > transformer, as well as the processor API: > https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
ijuma commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375845098 Thanks for the PR. Can we add a test in that case? We'd want to verify manually that the test matches the previous behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14609) Kafka Streams Processor API cannot use state stores
[ https://issues.apache.org/jira/browse/KAFKA-14609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656166#comment-17656166 ] Bill Bejeck commented on KAFKA-14609: - Hi [~philipp94831] Thanks for reporting this issue. I believe it's been fixed with https://issues.apache.org/jira/browse/KAFKA-14388. You could pull down either the 3.4.0 branch or 3.3.2 and build from source and test it. For now, I'm going to mark this as fixed. > Kafka Streams Processor API cannot use state stores > --- > > Key: KAFKA-14609 > URL: https://issues.apache.org/jira/browse/KAFKA-14609 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.0 >Reporter: Philipp Schirmer >Priority: Major > > The recently introduced Kafka Streams Processor API (since 3.3, > https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with > regards to using state stores. The > [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-] > method returns null, even though the store has been registered according to > the docs. The old transformer API still works. I created a small project that > demonstrates the behavior. It uses both methods to register a store for the > transformer, as well as the processor API: > https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rondagostino commented on a diff in pull request #13058: KAFKA-14557; Lock metadata log dir
rondagostino commented on code in PR #13058: URL: https://github.com/apache/kafka/pull/13058#discussion_r1064790638 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -81,25 +100,99 @@ class RaftManagerTest { ) } - @Test - def testNodeIdPresentIfBrokerRoleOnly(): Unit = { -val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1") -assertEquals(1, raftManager.client.nodeId.getAsInt) + @ParameterizedTest + @ValueSource(strings = Array("broker", "controller", "broker,controller")) + def testNodeIdPresent(processRoles: String): Unit = { +var processRolesSet = Set.empty[ProcessRole] +if (processRoles.contains("broker")) { + processRolesSet = processRolesSet ++ Set(BrokerRole) +} +if (processRoles.contains("controller")) { + processRolesSet = processRolesSet ++ Set(ControllerRole) +} + +val logDir = TestUtils.tempDir() +val nodeId = 1 +val raftManager = createRaftManager( + new TopicPartition("__raft_id_test", 0), + createConfig( +processRolesSet, +nodeId, +Some(logDir.toPath), +None + ) +) +assertEquals(nodeId, raftManager.client.nodeId.getAsInt) raftManager.shutdown() } - @Test - def testNodeIdPresentIfControllerRoleOnly(): Unit = { -val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1") -assertEquals(1, raftManager.client.nodeId.getAsInt) + @ParameterizedTest + @ValueSource(strings = Array("metadata", "log", "metadata,log")) + def testLogDirLockWhenControllerOnly(dirType: String): Unit = { +val logDir = if (dirType.contains("metadata")) { + Some(TestUtils.tempDir().toPath) +} else { + None +} + +val metadataDir = if (dirType.contains("log")) { + Some(TestUtils.tempDir().toPath) +} else { + None +} Review Comment: I think this might be clearer. ``` @ValueSource(strings = Array("metadata-only", "log-only", "both")) def testLogDirLockWhenControllerOnly(dirType: String): Unit = { val logDir = if (!dirType.equals("metadata-only")) { Some(TestUtils.tempDir().toPath) } else { None } val metadataDir = if (!dirType.equals("log-only")) { Some(TestUtils.tempDir().toPath) } else { None } ``` ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -81,25 +100,99 @@ class RaftManagerTest { ) } - @Test - def testNodeIdPresentIfBrokerRoleOnly(): Unit = { -val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1") -assertEquals(1, raftManager.client.nodeId.getAsInt) + @ParameterizedTest + @ValueSource(strings = Array("broker", "controller", "broker,controller")) + def testNodeIdPresent(processRoles: String): Unit = { +var processRolesSet = Set.empty[ProcessRole] +if (processRoles.contains("broker")) { + processRolesSet = processRolesSet ++ Set(BrokerRole) +} +if (processRoles.contains("controller")) { + processRolesSet = processRolesSet ++ Set(ControllerRole) +} + +val logDir = TestUtils.tempDir() +val nodeId = 1 +val raftManager = createRaftManager( + new TopicPartition("__raft_id_test", 0), + createConfig( +processRolesSet, +nodeId, +Some(logDir.toPath), +None + ) +) +assertEquals(nodeId, raftManager.client.nodeId.getAsInt) raftManager.shutdown() } - @Test - def testNodeIdPresentIfControllerRoleOnly(): Unit = { -val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1") -assertEquals(1, raftManager.client.nodeId.getAsInt) + @ParameterizedTest + @ValueSource(strings = Array("metadata", "log", "metadata,log")) + def testLogDirLockWhenControllerOnly(dirType: String): Unit = { +val logDir = if (dirType.contains("metadata")) { + Some(TestUtils.tempDir().toPath) +} else { + None +} + +val metadataDir = if (dirType.contains("log")) { + Some(TestUtils.tempDir().toPath) +} else { + None +} + +val nodeId = 1 +val raftManager = createRaftManager( + new TopicPartition("__raft_id_test", 0), + createConfig( +Set(ControllerRole), +nodeId, +logDir, +metadataDir + ) +) + +val lockPath = metadataDir.getOrElse(logDir.get).resolve(LogManager.LockFileName) +assertTrue(fileLocked(lockPath)) + raftManager.shutdown() + +assertFalse(fileLocked(lockPath)) } @Test - def testNodeIdPresentIfColocated(): Unit = { -val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1") -assertEquals(1, raftManager.client.nodeId.getAsInt) + def testLogDirLockWhenMetadataDi
[GitHub] [kafka] C0urante commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks
C0urante commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1064784201 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -780,6 +774,14 @@ protected void stopServices() { } } +// Timeout for herderExecutor to gracefully terminate is set to a value to accommodate +// reading to the end of the config topic + successfully attempting to stop all connectors and tasks and a buffer of 10s +private long herderExecutorTimeoutMs() { +return this.workerSyncTimeoutMs + +config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG) + Review Comment: Wait a minute, is this right? Why are we using the sync timeout twice here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14535) Flaky test PlaintextEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe
[ https://issues.apache.org/jira/browse/KAFKA-14535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656151#comment-17656151 ] Proven Provenzano commented on KAFKA-14535: --- Thanks for the patch! > Flaky test > PlaintextEndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe > - > > Key: KAFKA-14535 > URL: https://issues.apache.org/jira/browse/KAFKA-14535 > Project: Kafka > Issue Type: Test >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Labels: flaky-test > > This test has failed multiple times recently: > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1446/tests/] > org.opentest4j.AssertionFailedError: expected acls: > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) > (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > but got: > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1439/tests/] > org.opentest4j.AssertionFailedError: expected acls: > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) > (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > but got: > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1436/tests/] > org.opentest4j.AssertionFailedError: expected acls: > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) > (principal=User:client, host=*, operation=DESCRIBE, permissionType=ALLOW) > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > but got: > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > The stacktrace is: > {noformat} > org.opentest4j.AssertionFailedError: expected acls: > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > (principal=User:client, host=*, operation=WRITE, permissionType=ALLOW) > (principal=User:client, host=*, operation=DESCRIBE, > permissionType=ALLOW) > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > but got: > (principal=User:client, host=*, operation=CREATE, permissionType=ALLOW) > (principal=User:client, host=*, operation=DESCRIBE, > permissionType=ALLOW) > (principal=User:client, host=*, operation=READ, permissionType=ALLOW) > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at app//kafka.utils.TestUtils$.waitAndVerifyAcls(TestUtils.scala:1075) > at > app//kafka.api.EndToEndAuthorizationTest.$anonfun$setReadAndWriteAcls$1(EndToEndAuthorizationTest.scala:312) > at > app//kafka.api.EndToEndAuthorizationTest.$anonfun$setReadAndWriteAcls$1$adapted(EndToEndAuthorizationTest.scala:311) > at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) > at > app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) > at app//scala.collection.AbstractIterable.foreach(Iterable.scala:933) > at > app//kafka.api.EndToEndAuthorizationTest.setReadAndWriteAcls(EndToEndAuthorizationTest.scala:311) > at > app//kafka.api.EndToEndAuthorizationTest.testNoConsumeWithoutDescribeAclViaSubscribe(EndToEndAuthorizationTest.scala:478) > at > java.base@17.0.4.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.4.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.4.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >
[GitHub] [kafka] fvaleri commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module
fvaleri commented on code in PR #13080: URL: https://github.com/apache/kafka/pull/13080#discussion_r1064750856 ## tools/src/main/java/org/apache/kafka/tools/ClusterTool.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class ClusterTool { + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { Review Comment: I like the idea of the command interface. In that case, we should add the suffix `*Command` to all implementing classes (e.g. `ClusterToolCommand`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 commented on PR #13095: URL: https://github.com/apache/kafka/pull/13095#issuecomment-1375755934 @ijuma , I made the changes, but I couldn't find any tests associated with the scala class. Wanted to know how can I test this . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request, #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module
vamossagar12 opened a new pull request, #13095: URL: https://github.com/apache/kafka/pull/13095 Move EndToEndLatency to tools -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656141#comment-17656141 ] Chris Egerton commented on KAFKA-14565: --- [~beardt] thanks for identifying this issue. Have you considered either altering the {{AbstractConfig::getConfiguredInstances}} method, or the logic in the client classes that leverage it, to invoke {{close}} on any interceptors that have already been instantiated and configured in the scenario described in this ticket? This would allow us to address the resource leak problem without altering public interface (which requires a KIP) or requiring action on the part of developers. > Improve Interceptor Resource Leakage Prevention > --- > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and check exception on > the respective Consumer/Producer interceptor interfaces. This open method > will be responsible for creating threads and/or objects which utilizes > threads, connections or other resource which requires clean up. > Additionally, the default open method enables implementation optionality as > it's empty default behavior means it will do nothing when unimplemented. > Additionally, the Kafka Consumer/Producer Interceptor containers will > implement a corresponding maybeOpen method which throws a checked exception. > In order to maintain backwards compatibility with earlier developed > interceptors the maybeOpen will check whether the interceptor's interface > contains the newer open method before calling it accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on PR #12870: URL: https://github.com/apache/kafka/pull/12870#issuecomment-1375740915 @jolshan @jeffkbkim I have updated the PR. Could you take another look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1064719096 ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest { } } - @Test - def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = { Review Comment: I have looked at other APIs and we are not consistent, unfortunately. I believe that my current implementation (returning a response for each provided group in the same order) is the right way and that could be considered as a bug. I have asked about this in the KIP-709 discuss thread as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module
mimaison commented on code in PR #13080: URL: https://github.com/apache/kafka/pull/13080#discussion_r1064710964 ## tools/src/main/java/org/apache/kafka/tools/ClusterTool.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class ClusterTool { + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { Review Comment: The idea was to make it like `MetadataQuorumCommand`. In the future we will be able to put this method in a Command interface or utils class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14609) Kafka Streams Processor API cannot use state stores
Philipp Schirmer created KAFKA-14609: Summary: Kafka Streams Processor API cannot use state stores Key: KAFKA-14609 URL: https://issues.apache.org/jira/browse/KAFKA-14609 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.3.0 Reporter: Philipp Schirmer The recently introduced Kafka Streams Processor API (since 3.3, https://issues.apache.org/jira/browse/KAFKA-13654) likely has a bug with regards to using state stores. The [getStateStore|https://javadoc.io/static/org.apache.kafka/kafka-streams/3.3.1/org/apache/kafka/streams/processor/api/ProcessingContext.html#getStateStore-java.lang.String-] method returns null, even though the store has been registered according to the docs. The old transformer API still works. I created a small project that demonstrates the behavior. It uses both methods to register a store for the transformer, as well as the processor API: https://github.com/bakdata/kafka-streams-state-store-demo/blob/main/src/test/java/com/bakdata/kafka/StreamsStateStoreTest.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module
mimaison commented on code in PR #13080: URL: https://github.com/apache/kafka/pull/13080#discussion_r1064694674 ## tools/src/main/java/org/apache/kafka/tools/ClusterTool.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class ClusterTool { + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws Exception { +ArgumentParser parser = ArgumentParsers +.newArgumentParser("kafka-cluster") +.defaultHelp(true) +.description("The Kafka cluster tool."); +Subparsers subparsers = parser.addSubparsers().dest("command"); + +Subparser clusterIdParser = subparsers.addParser("cluster-id") +.help("Get information about the ID of a cluster."); +Subparser unregisterParser = subparsers.addParser("unregister") +.help("Unregister a broker."); +for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) { +subpparser.addArgument("--bootstrap-server", "-b") +.action(store()) +.help("A list of host/port pairs to use for establishing the connection to the kafka cluster."); +subpparser.addArgument("--config", "-c") +.action(store()) +.help("A property file containing configs to passed to AdminClient."); Review Comment: Thanks, I've reworded that message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1064619499 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel, offsetFetchResponse } requestHelper.sendResponseMaybeThrottle(request, createResponse) +CompletableFuture.completedFuture[Unit](()) } - private def handleOffsetFetchRequestBetweenV1AndV7(request: RequestChannel.Request): Unit = { -val header = request.header + private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = { val offsetFetchRequest = request.body[OffsetFetchRequest] -val groupId = offsetFetchRequest.groupId() -val (error, partitionData) = fetchOffsets(groupId, offsetFetchRequest.isAllPartitions, - offsetFetchRequest.requireStable, offsetFetchRequest.partitions, request.context) -def createResponse(requestThrottleMs: Int): AbstractResponse = { - val offsetFetchResponse = -if (error != Errors.NONE) { - offsetFetchRequest.getErrorResponse(requestThrottleMs, error) -} else { - new OffsetFetchResponse(requestThrottleMs, Errors.NONE, partitionData.asJava) -} - trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") - offsetFetchResponse +val groups = offsetFetchRequest.groups() +val requireStable = offsetFetchRequest.requireStable() + +val futures = new mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size) +groups.forEach { groupOffsetFetch => + val isAllPartitions = groupOffsetFetch.topics == null + val future = if (isAllPartitions) { +fetchAllOffsets( + request.context, + groupOffsetFetch, + requireStable +) + } else { +fetchOffsets( + request.context, + groupOffsetFetch, + requireStable +) + } + futures += future } -requestHelper.sendResponseMaybeThrottle(request, createResponse) - } - - private def handleOffsetFetchRequestV8AndAbove(request: RequestChannel.Request): Unit = { -val header = request.header -val offsetFetchRequest = request.body[OffsetFetchRequest] -val groupIds = offsetFetchRequest.groupIds().asScala -val groupToErrorMap = mutable.Map.empty[String, Errors] -val groupToPartitionData = mutable.Map.empty[String, util.Map[TopicPartition, PartitionData]] -val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions() -groupIds.foreach(g => { - val (error, partitionData) = fetchOffsets(g, -offsetFetchRequest.isAllPartitionsForGroup(g), -offsetFetchRequest.requireStable(), -groupToTopicPartitions.get(g), request.context) - groupToErrorMap += (g -> error) - groupToPartitionData += (g -> partitionData.asJava) -}) -def createResponse(requestThrottleMs: Int): AbstractResponse = { - val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs, -groupToErrorMap.asJava, groupToPartitionData.asJava) - trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.") - offsetFetchResponse +CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) => + val groupResponses = new ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size) + futures.foreach(future => groupResponses += future.get()) + requestHelper.sendMaybeThrottle(request, new OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion)) } - -requestHelper.sendResponseMaybeThrottle(request, createResponse) } - private def fetchOffsets(groupId: String, isAllPartitions: Boolean, requireStable: Boolean, - partitions: util.List[TopicPartition], context: RequestContext): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = { -if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) { - (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty) -} else { - if (isAllPartitions) { -val (error, allPartitionData) = groupCoordinator.handleFetchOffsets(groupId, requireStable) -if (error != Errors.NONE) { - (error, allPartitionData) -} else { - // clients are not allowed to see offsets for topics that are not authorized for Describe - val (authorizedPartitionData, _) = authHelper.partitionMapByAuthorized(context, -DESCRIBE, TOPIC, allPartitionData)(_.topic) - (Errors.NONE, authorizedPartitionData) -} + private def fetchAllOffsets( Review Comment: Renaming in KafkaApis is reasonable. For GroupCoordinator, the ForGroup seems a bit redundant so I would rather keep it as it i
[GitHub] [kafka] dajac commented on a diff in pull request #12870: KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface
dajac commented on code in PR #12870: URL: https://github.com/apache/kafka/pull/12870#discussion_r1064618016 ## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java: ## @@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs, this.error = null; } +public OffsetFetchResponse(List groups, short version) { +super(ApiKeys.OFFSET_FETCH); +data = new OffsetFetchResponseData(); + +if (version >= 8) { +data.setGroups(groups); +error = null; + +for (OffsetFetchResponseGroup group : data.groups()) { +this.groupLevelErrors.put(group.groupId(), Errors.forCode(group.errorCode())); +} +} else { +if (groups.size() != 1) { +throw new UnsupportedVersionException( +"Version " + version + " of OffsetFetchResponse only support one group." +); +} + +OffsetFetchResponseGroup group = groups.get(0); +data.setErrorCode(group.errorCode()); +error = Errors.forCode(group.errorCode()); + +group.topics().forEach(topic -> { +OffsetFetchResponseTopic newTopic = new OffsetFetchResponseTopic().setName(topic.name()); +data.topics().add(newTopic); + +topic.partitions().forEach(partition -> { +OffsetFetchResponsePartition newPartition; + +if (version < 2 && group.errorCode() != Errors.NONE.code()) { +// Versions prior to version 2 does not support a top level error. Therefore +// we put it at the partition level. +newPartition = new OffsetFetchResponsePartition() +.setPartitionIndex(partition.partitionIndex()) +.setErrorCode(group.errorCode()); +} else { Review Comment: It is still possible to have a partition level error with version >= 2 (e.g. UNSTABLE_OFFSET_COMMIT). To answer your second point, if there is an error, the offset/metadata should be correctly set at this stage so we can just copy whatever we have got here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13094: MINOR: Various cleanups in client tests
divijvaidya commented on code in PR #13094: URL: https://github.com/apache/kafka/pull/13094#discussion_r1064594474 ## clients/src/test/java/org/apache/kafka/clients/FetchSessionHandlerTest.java: ## @@ -320,7 +315,7 @@ public void testDoubleBuild() { try { builder.build(); fail("Expected calling build twice to fail."); -} catch (Throwable t) { +} catch (NullPointerException npe) { Review Comment: NPE is a weird way of enforcing idempotency for build()! While you are in this code base could we add a javadoc to build() to clarify this expectation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14565) Improve Interceptor Resource Leakage Prevention
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Summary: Improve Interceptor Resource Leakage Prevention (was: Improving Interceptor Resource Leakage Prevention) > Improve Interceptor Resource Leakage Prevention > --- > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > AbstractConfig.getConfiguredInstances() is delegated responsibility for both > creating and configuring each interceptor listed in the interceptor.classes > property and returns a configured List> > interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > defining a default open method with no implementation and check exception on > the respective Consumer/Producer interceptor interfaces. This open method > will be responsible for creating threads and/or objects which utilizes > threads, connections or other resource which requires clean up. > Additionally, the default open method enables implementation optionality as > it's empty default behavior means it will do nothing when unimplemented. > Additionally, the Kafka Consumer/Producer Interceptor containers will > implement a corresponding maybeOpen method which throws a checked exception. > In order to maintain backwards compatibility with earlier developed > interceptors the maybeOpen will check whether the interceptor's interface > contains the newer open method before calling it accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)
divijvaidya commented on code in PR #13078: URL: https://github.com/apache/kafka/pull/13078#discussion_r1064581958 ## docs/ops.html: ## @@ -1604,6 +1604,11 @@
[GitHub] [kafka] mimaison opened a new pull request, #13094: MINOR: Various cleanups in client tests
mimaison opened a new pull request, #13094: URL: https://github.com/apache/kafka/pull/13094 - Simplify assertions - Remove redundant types - Use lambdas instead of anonymous classes - Remove unnecessary throws ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13995) Does Kafka support Network File System (NFS)? Is it recommended in Production?
[ https://issues.apache.org/jira/browse/KAFKA-13995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656031#comment-17656031 ] Devarshi Shah commented on KAFKA-13995: --- Any updates guys? > Does Kafka support Network File System (NFS)? Is it recommended in Production? > -- > > Key: KAFKA-13995 > URL: https://issues.apache.org/jira/browse/KAFKA-13995 > Project: Kafka > Issue Type: Test >Affects Versions: 3.0.0 > Environment: Kubernetes Cluster >Reporter: Devarshi Shah >Priority: Blocker > > I've gone through the Apache Kafka Documentation. It does not contain > information about the support of underlying storage type, whether Kafka > supports block storage, Network File System (NFS) and/or others. On the > internet, I could find that it supports NFS, however most of them summarize > not to use NFS in Production. May we get proper information whether Kafka > recommends NFS in Production, or it doesn't support NFS to begin with? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on a diff in pull request #13080: KAFKA-14575: Move ClusterTool to tools module
fvaleri commented on code in PR #13080: URL: https://github.com/apache/kafka/pull/13080#discussion_r1064457544 ## tools/src/main/java/org/apache/kafka/tools/ClusterTool.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.Namespace; +import net.sourceforge.argparse4j.inf.Subparser; +import net.sourceforge.argparse4j.inf.Subparsers; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static net.sourceforge.argparse4j.impl.Arguments.store; + +public class ClusterTool { + +public static void main(String... args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws Exception { +ArgumentParser parser = ArgumentParsers +.newArgumentParser("kafka-cluster") +.defaultHelp(true) +.description("The Kafka cluster tool."); +Subparsers subparsers = parser.addSubparsers().dest("command"); + +Subparser clusterIdParser = subparsers.addParser("cluster-id") +.help("Get information about the ID of a cluster."); +Subparser unregisterParser = subparsers.addParser("unregister") +.help("Unregister a broker."); +for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser)) { +subpparser.addArgument("--bootstrap-server", "-b") +.action(store()) +.help("A list of host/port pairs to use for establishing the connection to the kafka cluster."); +subpparser.addArgument("--config", "-c") +.action(store()) +.help("A property file containing configs to passed to AdminClient."); +} +unregisterParser.addArgument("--id", "-i") +.type(Integer.class) +.action(store()) +.required(true) +.help("The ID of the broker to unregister."); + +Namespace namespace = parser.parseArgsOrFail(args); +String command = namespace.getString("command"); +String configPath = namespace.getString("config"); +Properties properties = (configPath == null) ? new Properties() : Utils.loadProps(configPath); + +String bootstrapServer = namespace.getString("bootstrap_server"); +if (bootstrapServer != null) { +properties.setProperty("bootstrap.servers", bootstrapServer); +} +if (properties.getProperty("bootstrap.servers") == null) { +throw new TerseException("Please specify --bootstrap-server."); +} + +switch (command) { +case "cluster-id": { +try (Admin adminClient = Admin.create(properties)) { +clusterIdCommand(System.out, adminClient); +} +break; +} +case "unregister": { +try (Admin adminClient = Admin.create(properties)) { +unregisterCommand(System.out, adminClient, namespace.getInt("id")); +} +break; +} +default: +throw new RuntimeException("Unknown command " + command); +} +} + +static void clusterIdCommand(PrintStream stream, Admin adminClient) throws Exception { +String clusterId = adminClient.describeCluster().cl
[GitHub] [kafka] iamazy commented on pull request #13072: KAFKA-14570: Add missing closing parenthesis symbol
iamazy commented on PR #13072: URL: https://github.com/apache/kafka/pull/13072#issuecomment-1375394833 > @iamazy Thanks for fixing this. For small issues like this you don't need to open a Jira, you can just open a PR and put `MINOR` in the title, for example [6b5e9e9](https://github.com/apache/kafka/commit/6b5e9e989b7a1f8c387a79dea0117e52401853e1) @mimaison Got it, Thanks you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13072: KAFKA-14570: Add missing closing parenthesis symbol
mimaison commented on PR #13072: URL: https://github.com/apache/kafka/pull/13072#issuecomment-1375384949 @iamazy Thanks for fixing this. For small issues like this you don't need to open a Jira, you can just open a PR and put `MINOR` in the title, for example https://github.com/apache/kafka/commit/6b5e9e989b7a1f8c387a79dea0117e52401853e1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14570) Problem description missing closing parenthesis symbol
[ https://issues.apache.org/jira/browse/KAFKA-14570?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-14570. Fix Version/s: 3.5.0 Resolution: Fixed > Problem description missing closing parenthesis symbol > -- > > Key: KAFKA-14570 > URL: https://issues.apache.org/jira/browse/KAFKA-14570 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: iamazy >Priority: Trivial > Fix For: 3.5.0 > > > In > [verifyFullFetchResponsePartitions|https://github.com/apache/kafka/blob/ad94dc2134474c9d790fe0bb79c0d390c562846a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java#L425-L449], > the problem description missing closing parenthesis symbol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison merged pull request #13072: KAFKA-14570: Add missing closing parenthesis symbol
mimaison merged PR #13072: URL: https://github.com/apache/kafka/pull/13072 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna closed pull request #13093: DO NOT MERGE: PR to test GitHub API
cadonna closed pull request #13093: DO NOT MERGE: PR to test GitHub API URL: https://github.com/apache/kafka/pull/13093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #13093: DO NOT MERGE: PR to test GitHub API
cadonna opened a new pull request, #13093: URL: https://github.com/apache/kafka/pull/13093 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14602) offsetDelta in BatchMetadata is an int but the values are computed as difference of offsets which are longs.
[ https://issues.apache.org/jira/browse/KAFKA-14602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-14602: - Assignee: Luke Chen > offsetDelta in BatchMetadata is an int but the values are computed as > difference of offsets which are longs. > > > Key: KAFKA-14602 > URL: https://issues.apache.org/jira/browse/KAFKA-14602 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Satish Duggana >Assignee: Luke Chen >Priority: Major > > This is a followup of the discussion in > https://github.com/apache/kafka/pull/13043#discussion_r1063071578 > offsetDelta in BatchMetadata is an int. Becasue of which, ProducerAppendInfo > may set a value that can overflow. Ideally, this data type should be long > instead of int. -- This message was sent by Atlassian Jira (v8.20.10#820010)