[GitHub] [kafka] abhijeetk88 opened a new pull request, #14127: KAFKA-15181: Wait for RemoteLogMetadataCahce to initialize after assi…
abhijeetk88 opened a new pull request, #14127: URL: https://github.com/apache/kafka/pull/14127 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #14114: KAFKA-12969: Add broker level config synonyms for topic level tiered storage configs
satishd commented on code in PR #14114: URL: https://github.com/apache/kafka/pull/14114#discussion_r1278245511 ## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ## @@ -62,13 +63,17 @@ class LogConfigTest { kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2") kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2") kafkaProps.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0") +kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "30") Review Comment: As these values are long types. Can we have a value more than Integer.MAX? ## core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala: ## @@ -713,6 +714,99 @@ class DynamicBrokerConfigTest { config.updateCurrentConfig(new KafkaConfig(props)) assertFalse(config.nonInternalValues.containsKey(KafkaConfig.MetadataLogSegmentMinBytesProp)) } + + @Test + def testDynamicLogLocalRetentionMsConfig(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) +props.put(KafkaConfig.LogRetentionTimeMillisProp, "1000") Review Comment: As these values are long types. Can we have a value more than Integer.MAX? Same comment in this class for other retention properties. ## core/src/test/scala/unit/kafka/log/LogConfigTest.scala: ## @@ -62,13 +63,17 @@ class LogConfigTest { kafkaProps.put(KafkaConfig.LogRollTimeJitterHoursProp, "2") kafkaProps.put(KafkaConfig.LogRetentionTimeHoursProp, "2") kafkaProps.put(KafkaConfig.LogMessageFormatVersionProp, "0.11.0") +kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "30") +kafkaProps.put(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1024") Review Comment: As these values are long types. Can we have a value more than Integer.MAX? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] github-actions[bot] commented on pull request #12781: KAFKA-14132; Replace EasyMock with Mockito in KafkaBasedLogTest
github-actions[bot] commented on PR #12781: URL: https://github.com/apache/kafka/pull/12781#issuecomment-1656539350 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurrs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.
[ https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748778#comment-17748778 ] Chris Egerton edited comment on KAFKA-15252 at 7/29/23 3:12 AM: I think this is caused by KAFKA-15090. [~nikita.krasnov] do you think we can close this as a duplicate, or is it a different issue? was (Author: chrisegerton): I think this is caused by KAFKA-15090. [~nikita.krasnov] do you think we can close this as a duplicate, or is this a different issue? > Task is not stopped until the poll interval passes in case of task restarting. > -- > > Key: KAFKA-15252 > URL: https://issues.apache.org/jira/browse/KAFKA-15252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nikita >Priority: Major > > We face a problem with restarting of the tasks, sometimes it leads to > resource leak. > We used the jdbc source connector and noticed an increasing of count of > opened sessions on Vertica side. But this problem is applicable for all > databases and possibly for all source connectors. > Our case is the next: > 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) > and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's > the property on Kafka-connect side, we set 1) > 2) Send POST /connectors//tasks//restart > ER: count of session is the same as before restart > AR: count of session increases > The main problem is when > org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection) > method is called it doesn't stop a source task itself. > The source task stops only if polling process stops on source task side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15252) Task is not stopped until the poll interval passes in case of task restarting.
[ https://issues.apache.org/jira/browse/KAFKA-15252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748778#comment-17748778 ] Chris Egerton commented on KAFKA-15252: --- I think this is caused by KAFKA-15090. [~nikita.krasnov] do you think we can close this as a duplicate, or is this a different issue? > Task is not stopped until the poll interval passes in case of task restarting. > -- > > Key: KAFKA-15252 > URL: https://issues.apache.org/jira/browse/KAFKA-15252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nikita >Priority: Major > > We face a problem with restarting of the tasks, sometimes it leads to > resource leak. > We used the jdbc source connector and noticed an increasing of count of > opened sessions on Vertica side. But this problem is applicable for all > databases and possibly for all source connectors. > Our case is the next: > 1) Run jdbc source connector (io.confluent.connect.jdbc.JdbcSourceConnector) > and set poll.interval.ms (8640) > task.shutdown.graceful.timeout.ms (it's > the property on Kafka-connect side, we set 1) > 2) Send POST /connectors//tasks//restart > ER: count of session is the same as before restart > AR: count of session increases > The main problem is when > org.apache.kafka.connect.runtime.Worker#stopAndAwaitTasks(java.util.Collection) > method is called it doesn't stop a source task itself. > The source task stops only if polling process stops on source task side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #14101: Source task stop call was added to force stopping execution.
C0urante commented on PR #14101: URL: https://github.com/apache/kafka/pull/14101#issuecomment-165657 There's also [KAFKA-15090](https://issues.apache.org/jira/browse/KAFKA-15090), which gives some context on why we stopped invoking `SourceTask::stop` on the herder thread (which is the thread that would be invoking that method in this PR). I've been exploring some potential fixes, will publish a PR next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 commented on pull request #14109: [MINOR] Addressing NPE when broker's initialCatchUpFuture fails
ahuang98 commented on PR #14109: URL: https://github.com/apache/kafka/pull/14109#issuecomment-1656471910 https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14109/2/pipeline/12 looks pretty good but I'll retrigger another build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580
junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1278072179 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1488,9 +1500,10 @@ public void run() { } else if (!heartbeat.shouldHeartbeat(now)) { // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected - AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs); + AbstractCoordinator.this.wait(retryBackoff.backoff(attempts++)); } else { heartbeat.sentHeartbeat(now); +attempts = 0L; Review Comment: I think the common case where exponential backoff could be helpful is that during a heartbeat failure, the coordinator has changed, but it takes some time to discover the coordinator. The current code will do that following in a loop in that case. ``` sendHeartbeat get NotCoordinator error findCoordinator wait for retryBackoff ``` With the new change, since attempts is reset on every Heartbeat request. We will do the same loop as the above with no exponential backoff in between. ## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ## @@ -203,18 +224,43 @@ public class CommonClientConfigs { * @return The new values which have been set as described in postProcessParsedConfig. */ public static Map postProcessReconnectBackoffConfigs(AbstractConfig config, -Map parsedValues) { + Map parsedValues) { HashMap rval = new HashMap<>(); Map originalConfig = config.originals(); if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { -log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.", +log.warn("Disabling exponential reconnect backoff because {} is set, but {} is not.", RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG); rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG)); } return rval; } +/** + * Log warning if the exponential backoff is disabled due to initial backoff value is greater than max backoff value. + * + * @param configThe config object. + */ +public static void warnDisablingExponentialBackoff(AbstractConfig config) { +long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG); +long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG); +if (retryBackoffMs > retryBackoffMaxMs) { +log.warn("Configuration '{}' with value '{}' is greater than configuration '{}' with value '{}'. " + +"A static backoff with value '{}' will be applied.", +RETRY_BACKOFF_MS_CONFIG, retryBackoffMs, +RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, retryBackoffMs); Review Comment: Should the last param be `retryBackoffMaxMs`? Ditto for `connectionSetupTimeoutMs` below. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ## @@ -278,11 +291,13 @@ private void validatePositionsAsync(Map partition offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); future.addListener(new RequestFutureListener() { +private long attempts = 0L; @Override public void onSuccess(OffsetForEpochResult offsetsResult) { List truncations = new ArrayList<>(); if (!offsetsResult.partitionsToRetry().isEmpty()) { - subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); + subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), Review Comment: Same question as the above. Does this really do exponential backoff since attempts is 0 for every new request? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ## @@ -225,14 +231,21 @@ private void resetPositionsAsync(Map partitionResetTimesta RequestFuture future = sendListOffsetRequest(node, resetTimestamps, false); future.addListener(new RequestFutureListener() { +long attempts = 0L; @Override public void onSuccess(ListOffsetResult result) { -
[jira] [Assigned] (KAFKA-15271) Historicalterator can exposes elements that are too new
[ https://issues.apache.org/jira/browse/KAFKA-15271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-15271: Assignee: Colin McCabe > Historicalterator can exposes elements that are too new > --- > > Key: KAFKA-15271 > URL: https://issues.apache.org/jira/browse/KAFKA-15271 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: Colin McCabe >Priority: Major > > Example: > {code:java} > @Test > public void bug() { > SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new > LogContext()); > // Topic -> Partition -> Offset > TimelineHashMap> offsets = > new TimelineHashMap<>(snapshotRegistry, 0); > snapshotRegistry.getOrCreateSnapshot(0); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(0, 100L); > snapshotRegistry.getOrCreateSnapshot(1); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(1, 110L); > snapshotRegistry.getOrCreateSnapshot(2); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(1, 111L); > assertNull(offsets.get("foo", 1).get(1, 1)); > offsets.entrySet(1).forEach(topicEntry -> { > System.out.println(topicEntry.getKey()); > topicEntry.getValue().entrySet(1).forEach(partitionEntry -> { > System.out.println(partitionEntry.getKey() + " : " + > partitionEntry.getValue()); > }); > }); > /* > The above code prints: > foo > 0 : 100 > 1 : 110 > but should rather print: > foo > 0 : 100 > */ > } {code} > It yields the expected result when the third put is removed. `get(key, > epoch)` is always correct as well. It seems that `entrySet` has an issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee commented on PR #14123: URL: https://github.com/apache/kafka/pull/14123#issuecomment-1656346526 Umm build failures for jdk 8 plus some non-related test failures: ``` Build / JDK 20 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest 1m 31s Build / JDK 11 and Scala 2.13 / testGracefulClose() – org.apache.kafka.clients.consumer.KafkaConsumerTest 2s Build / JDK 11 and Scala 2.13 / testCacheEntryExpiry() – kafka.log.remote.RemoteIndexCacheTest <1s Build / JDK 17 and Scala 2.13 / testGracefulClose() – org.apache.kafka.clients.consumer.KafkaConsumerTest 2s Build / JDK 17 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest 12s ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee commented on PR #14123: URL: https://github.com/apache/kafka/pull/14123#issuecomment-1656345210 We don't. Do you know when was is introduced? Is it in your inflight PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on pull request #14124: Kafka 14509
jeffkbkim commented on PR #14124: URL: https://github.com/apache/kafka/pull/14124#issuecomment-1656328786 Thanks, I will take a look next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #14124: Kafka 14509
dajac commented on PR #14124: URL: https://github.com/apache/kafka/pull/14124#issuecomment-1656325056 @riedelmax Thanks for the PR. I am on vacations during the next three weeks so I won’t be able to look at this one. @jeffkbkim Could you help @riedelmax? As a first step, @riedelmax could you please update the title and the description of the PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request, #14126: MINOR Fix a Scala 2.12 checkstyle issue
mumrah opened a new pull request, #14126: URL: https://github.com/apache/kafka/pull/14126 This was introduced in #14115. I added this as part of a small tweak after the last Jenkins job. I ran `./gradlew check -x test` before committing, but apparently the issue was only present in Scala 2.12. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15271) Historicalterator can exposes elements that are too new
[ https://issues.apache.org/jira/browse/KAFKA-15271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15271: - Summary: Historicalterator can exposes elements that are too new (was: TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap) > Historicalterator can exposes elements that are too new > --- > > Key: KAFKA-15271 > URL: https://issues.apache.org/jira/browse/KAFKA-15271 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Priority: Major > > Example: > {code:java} > @Test > public void bug() { > SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new > LogContext()); > // Topic -> Partition -> Offset > TimelineHashMap> offsets = > new TimelineHashMap<>(snapshotRegistry, 0); > snapshotRegistry.getOrCreateSnapshot(0); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(0, 100L); > snapshotRegistry.getOrCreateSnapshot(1); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(1, 110L); > snapshotRegistry.getOrCreateSnapshot(2); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(1, 111L); > assertNull(offsets.get("foo", 1).get(1, 1)); > offsets.entrySet(1).forEach(topicEntry -> { > System.out.println(topicEntry.getKey()); > topicEntry.getValue().entrySet(1).forEach(partitionEntry -> { > System.out.println(partitionEntry.getKey() + " : " + > partitionEntry.getValue()); > }); > }); > /* > The above code prints: > foo > 0 : 100 > 1 : 110 > but should rather print: > foo > 0 : 100 > */ > } {code} > It yields the expected result when the third put is removed. `get(key, > epoch)` is always correct as well. It seems that `entrySet` has an issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748724#comment-17748724 ] Max Riedel edited comment on KAFKA-14509 at 7/28/23 7:51 PM: - [~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2) https://github.com/apache/kafka/pull/14124 Please pay special attention to ConsumerGroupDescribeResponse.json as this is now different from the definition in KIP-848 was (Author: JIRAUSER300902): [~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2) [https://github.com/apache/kafka/pull/14124 Please pay special attention to ConsumerGroupDescribeResponse.json as this is now different from the definition in KIP-848|https://github.com/apache/kafka/pull/14124] > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748724#comment-17748724 ] Max Riedel edited comment on KAFKA-14509 at 7/28/23 7:50 PM: - [~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2) [https://github.com/apache/kafka/pull/14124 Please pay special attention to ConsumerGroupDescribeResponse.json as this is now different from the definition in KIP-848|https://github.com/apache/kafka/pull/14124] was (Author: JIRAUSER300902): [~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2) https://github.com/apache/kafka/pull/14124 > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748724#comment-17748724 ] Max Riedel commented on KAFKA-14509: [~dajac] Thanks for the helpful comments. Please see my PR for 1) and 2) https://github.com/apache/kafka/pull/14124 > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] riedelmax opened a new pull request, #14124: Kafka 14509
riedelmax opened a new pull request, #14124: URL: https://github.com/apache/kafka/pull/14124 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14101: Source task stop call was added to force stopping execution.
gharris1727 commented on PR #14101: URL: https://github.com/apache/kafka/pull/14101#issuecomment-1656189960 Hey thanks @akitoshka for giving some attention to this issue. Unfortunately the fix as-is can't be merged, for the reasons that @yashmayya and @vamossagar12 raised above. While the particular plugin behavior (resource leaks) is out-of-scope for the framework to fix, I do think it would be appropriate for the framework to indicate to plugins that they have been cancelled, to allow themselves to clean up. There is a relevant ticket here: https://issues.apache.org/jira/browse/KAFKA-14725 which covers some of the suggested fixes, and I'm happy to review a PR for that. Thanks again for your contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15263) KRaftMigrationDriver can run the migration twice
[ https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-15263. -- Resolution: Fixed > KRaftMigrationDriver can run the migration twice > > > Key: KAFKA-15263 > URL: https://issues.apache.org/jira/browse/KAFKA-15263 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.5.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > There is a narrow race condition in KRaftMigrationDriver where a PollEvent > can run that sees the internal state as ZK_MIGRATION and is immediately > followed by another poll event (due to external call to {{{}wakeup(){}}}) > that results in two MigrateMetadataEvent being enqueued. > Since MigrateMetadataEvent lacks a check on the internal state, this causes > the metadata migration to occur twice. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 merged pull request #14091: MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest
gharris1727 merged PR #14091: URL: https://github.com/apache/kafka/pull/14091 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on pull request #14091: MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest
gharris1727 commented on PR #14091: URL: https://github.com/apache/kafka/pull/14091#issuecomment-1656157411 Flaky test failures appear unrelated, and the JDK8 build succeeded with no failures! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15263) KRaftMigrationDriver can run the migration twice
[ https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15263: - Affects Version/s: 3.5.1 3.5.0 > KRaftMigrationDriver can run the migration twice > > > Key: KAFKA-15263 > URL: https://issues.apache.org/jira/browse/KAFKA-15263 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.5.1 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > There is a narrow race condition in KRaftMigrationDriver where a PollEvent > can run that sees the internal state as ZK_MIGRATION and is immediately > followed by another poll event (due to external call to {{{}wakeup(){}}}) > that results in two MigrateMetadataEvent being enqueued. > Since MigrateMetadataEvent lacks a check on the internal state, this causes > the metadata migration to occur twice. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan merged pull request #14117: MINOR: Code cleanups in group-coordinator module
jolshan merged PR #14117: URL: https://github.com/apache/kafka/pull/14117 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15263) KRaftMigrationDriver can run the migration twice
[ https://issues.apache.org/jira/browse/KAFKA-15263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-15263: - Fix Version/s: 3.6.0 3.5.2 > KRaftMigrationDriver can run the migration twice > > > Key: KAFKA-15263 > URL: https://issues.apache.org/jira/browse/KAFKA-15263 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.6.0, 3.5.2 > > > There is a narrow race condition in KRaftMigrationDriver where a PollEvent > can run that sees the internal state as ZK_MIGRATION and is immediately > followed by another poll event (due to external call to {{{}wakeup(){}}}) > that results in two MigrateMetadataEvent being enqueued. > Since MigrateMetadataEvent lacks a check on the internal state, this causes > the metadata migration to occur twice. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14702) Extend server side assignor to support rack aware replica placement
[ https://issues.apache.org/jira/browse/KAFKA-14702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14702. - Fix Version/s: 3.6.0 Resolution: Fixed > Extend server side assignor to support rack aware replica placement > --- > > Key: KAFKA-14702 > URL: https://issues.apache.org/jira/browse/KAFKA-14702 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Ritika Muduganti >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #14099: KAFKA-14702: Extend server side assignor to support rack aware replica placement
dajac commented on PR #14099: URL: https://github.com/apache/kafka/pull/14099#issuecomment-1656057225 The build for JDK 17 is stuck... The three others look good. I have seen a few build stuck this week in other PRs so this is clearly not related to changes made in this one. Moreover, we had successful builds previously and we addressed nits in the mean time. Therefore, I will merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee commented on code in PR #14123: URL: https://github.com/apache/kafka/pull/14123#discussion_r1277840446 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumerTest.java: ## @@ -153,12 +153,14 @@ public void testCommitAsync_UserSuppliedCallback() { @SuppressWarnings("unchecked") public void testCommitted() { Set mockTopicPartitions = mockTopicPartitionOffset().keySet(); -mockConstruction(OffsetFetchApplicationEvent.class, (mock, ctx) -> { -when(mock.complete(any())).thenReturn(new HashMap<>()); -}); -consumer = newConsumer(time, new StringDeserializer(), new StringDeserializer()); -assertDoesNotThrow(() -> consumer.committed(mockTopicPartitions, Duration.ofMillis(1))); - verify(eventHandler).add(ArgumentMatchers.isA(OffsetFetchApplicationEvent.class)); +try (MockedConstruction mockConstruction = mockConstruction(OffsetFetchApplicationEvent.class, Review Comment: try-with-resources ensure thread-lock mockConstruction is closed to ensure other tests would function properly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee commented on a diff in pull request #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee commented on code in PR #14123: URL: https://github.com/apache/kafka/pull/14123#discussion_r1277835494 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/OffsetFetchApplicationEvent.java: ## @@ -19,25 +19,33 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.time.Duration; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class OffsetFetchApplicationEvent extends ApplicationEvent { -final public CompletableFuture> future; -public final Set partitions; +private final CompletableFuture> future; +private final Set partitions; public OffsetFetchApplicationEvent(final Set partitions) { super(Type.FETCH_COMMITTED_OFFSET); this.partitions = partitions; this.future = new CompletableFuture<>(); } -public Map complete(final Duration duration) throws ExecutionException, InterruptedException, TimeoutException { Review Comment: Removed because this method was rather pointless -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request, #14123: MINOR: Fix committed API in the PrototypeAsyncConsumer timeout
philipnee opened a new pull request, #14123: URL: https://github.com/apache/kafka/pull/14123 **Summary** I discovered the committed() API timeout during the integration test. After investigation, this is because the future was not completed in the ApplicationEventProcessor. I also added `toString` methods to the event class for debug purposes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277811461 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Right. Let me do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override
dajac commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277803779 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: In my opinion, if you want to do something like that, it would be better to do it for all callbacks then. Adding just one comment there is not an ideal end state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277800358 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Yeah, we refactored these examples a while back, keeping the original logic, but improving error handling and output. We also added some comments, which may be useful to people new to Kafka. Adding this comment may be along these lines. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event
mumrah merged PR #14115: URL: https://github.com/apache/kafka/pull/14115 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #14115: KAFKA-15263 Check KRaftMigrationDriver state in each event
mumrah commented on PR #14115: URL: https://github.com/apache/kafka/pull/14115#issuecomment-1656019490 Test failures look unrelated https://github.com/apache/kafka/assets/55116/a8e1f6db-57ef-4450-b595-a54c164db524";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override
dajac commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277792665 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: I am sorry but I don't understand what you are trying to achieve here. Your explanation is correct and the javadoc of `onPartitionsLost` also explains this. I don't see the need for adding an extra comment just in this method in this example. Is your goal trying to improve the example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override
dajac commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277792665 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: I am sorry but I don't understand why you are trying to achieve here. Your explanation is correct and the javadoc of `onPartitionsLost` also explains this. I don't see the need for adding an extra comment just in this method in this example. Is your goal trying to improve the example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Add comment to onPartitionsLost override
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this reason, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` The default onPartitionsLost implementation is there to cover the case where partitions are reassigned before we have a chance to revoke them gracefully (i.e. in case of session timeout). Maybe we can leave the override adding an appropriate comment. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this reason, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` The default onPartitionsLost implementation is there to cover the case where partitions are reassigned before we have a chance to revoke them gracefully (i.e. in case of session timeout). Maybe we can leave the override adding an appropriate comment. Wdyt? Something like: ```sh // this is called when partitions are reassigned before we had a chance to revoke them gracefully // we can't commit pending offsets because these partitions are probably owned by other consumers already // nevertheless, we may need to do some other cleanup ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this reason, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` The default onPartitionsLost implementation is there to cover the case where partitions are reassigned before we have a chance to revoke them gracefully (i.e. in case of session timeout). Maybe we can leave the override adding an appropriate comment. Wdyt? Something like: ```sh // this is called when partitions are reassigned before we had a chance to revoke them gracefully, // we can't commit pending offsets because these partitions are probably owned by other consumers already // nevertheless, we may need to do some other cleanup ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this reason, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` The default onPartitionsLost implementation is there to cover the case where partitions are reassigned before we have a chance to revoke them gracefully (i.e. in case of session timeout). Maybe we can leave the override adding an appropriate comment. Wdyt? Something like: ```sh // this is called when partitions are reassigned before we had a chance to revoke them gracefully // we can't commit pending offsets because these partitions are probably owned by other consumers already // nevertheless, we may need to do some other cleanup ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14257) Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response
[ https://issues.apache.org/jira/browse/KAFKA-14257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748684#comment-17748684 ] jianbin.chen commented on KAFKA-14257: -- [~cokutan] You'll want to use the same meta.properties, rather than generating one for each broker. > Unexpected error INCONSISTENT_CLUSTER_ID in VOTE response > - > > Key: KAFKA-14257 > URL: https://issues.apache.org/jira/browse/KAFKA-14257 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.2.3 >Reporter: jianbin.chen >Priority: Major > > Please help me see why the error message is output indefinitely > broker1: > {code:java} > process.roles=broker,controller > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=1 > listeners=PLAINTEXT://192.168.6.57:9092,CONTROLLER://192.168.6.57:9093 > inter.broker.listener.name=PLAINTEXT > advertised.listeners=PLAINTEXT://192.168.6.57:9092 > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker2 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > node.id=2 > listeners=PLAINTEXT://192.168.6.56:9092,CONTROLLER://192.168.6.56:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs{code} > broker3 > {code:java} > process.roles=broker,controller > controller.listener.names=CONTROLLER > num.io.threads=8 > num.network.threads=5 > node.id=3 > listeners=PLAINTEXT://192.168.6.55:9092,CONTROLLER://192.168.6.55:9093 > inter.broker.listener.name=PLAINTEXT > controller.quorum.voters=1@192.168.6.57:9093,2@192.168.6.56:9093,3@192.168.6.55:9093 > log.dirs=/data01/kafka323-logs > {code} > error msg: > {code:java} > [2022-09-22 18:44:01,601] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=378, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,625] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=380, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,655] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=382, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,679] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=384, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,706] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=386, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient) > [2022-09-22 18:44:01,729] ERROR [RaftManager nodeId=2] Unexpected error > INCONSISTENT_CLUSTER_ID in VOTE response: InboundResponse(correlationId=388, > data=VoteResponseData(errorCode=104, topics=[]), sourceId=1) > (org.apache.kafka.raft.KafkaRaftClient){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15264) Compared with 1.1.0zk, the peak throughput of 3.5.1kraft is very jitter
[ https://issues.apache.org/jira/browse/KAFKA-15264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jianbin.chen updated KAFKA-15264: - Description: I was preparing to upgrade from 1.1.0 to 3.5.1 kraft mode (new cluster deployment), and when I recently compared and tested, I found that when using the following stress test command, the throughput gap is obvious {code:java} ./kafka-producer-perf-test.sh --topic test321 --num-records 3000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=xxx: acks=1 419813 records sent, 83962.6 records/sec (81.99 MB/sec), 241.1 ms avg latency, 588.0 ms max latency. 555300 records sent, 111015.6 records/sec (108.41 MB/sec), 275.1 ms avg latency, 460.0 ms max latency. 552795 records sent, 110536.9 records/sec (107.95 MB/sec), 265.9 ms avg latency, 1120.0 ms max latency. 552600 records sent, 110520.0 records/sec (107.93 MB/sec), 284.5 ms avg latency, 1097.0 ms max latency. 538500 records sent, 107656.9 records/sec (105.13 MB/sec), 277.5 ms avg latency, 610.0 ms max latency. 511545 records sent, 102309.0 records/sec (99.91 MB/sec), 304.1 ms avg latency, 1892.0 ms max latency. 511890 records sent, 102337.1 records/sec (99.94 MB/sec), 288.4 ms avg latency, 3000.0 ms max latency. 519165 records sent, 103812.2 records/sec (101.38 MB/sec), 262.1 ms avg latency, 1781.0 ms max latency. 513555 records sent, 102669.9 records/sec (100.26 MB/sec), 338.2 ms avg latency, 2590.0 ms max latency. 463329 records sent, 92665.8 records/sec (90.49 MB/sec), 276.8 ms avg latency, 1463.0 ms max latency. 494248 records sent, 98849.6 records/sec (96.53 MB/sec), 327.2 ms avg latency, 2362.0 ms max latency. 506272 records sent, 101254.4 records/sec (98.88 MB/sec), 322.1 ms avg latency, 2986.0 ms max latency. 393758 records sent, 78735.9 records/sec (76.89 MB/sec), 387.0 ms avg latency, 2958.0 ms max latency. 426435 records sent, 85252.9 records/sec (83.25 MB/sec), 363.3 ms avg latency, 1959.0 ms max latency. 412560 records sent, 82298.0 records/sec (80.37 MB/sec), 374.1 ms avg latency, 1995.0 ms max latency. 370137 records sent, 73997.8 records/sec (72.26 MB/sec), 396.8 ms avg latency, 1496.0 ms max latency. 391781 records sent, 78340.5 records/sec (76.50 MB/sec), 410.7 ms avg latency, 2446.0 ms max latency. 355901 records sent, 71166.0 records/sec (69.50 MB/sec), 397.5 ms avg latency, 2715.0 ms max latency. 385410 records sent, 77082.0 records/sec (75.28 MB/sec), 417.5 ms avg latency, 2702.0 ms max latency. 381160 records sent, 76232.0 records/sec (74.45 MB/sec), 407.7 ms avg latency, 1846.0 ms max latency. 67 records sent, 0.1 records/sec (65.10 MB/sec), 456.2 ms avg latency, 1414.0 ms max latency. 376251 records sent, 75175.0 records/sec (73.41 MB/sec), 401.9 ms avg latency, 1897.0 ms max latency. 354434 records sent, 70886.8 records/sec (69.23 MB/sec), 425.8 ms avg latency, 1601.0 ms max latency. 353795 records sent, 70744.9 records/sec (69.09 MB/sec), 411.7 ms avg latency, 1563.0 ms max latency. 321993 records sent, 64360.0 records/sec (62.85 MB/sec), 447.3 ms avg latency, 1975.0 ms max latency. 404075 records sent, 80750.4 records/sec (78.86 MB/sec), 408.4 ms avg latency, 1753.0 ms max latency. 384526 records sent, 76905.2 records/sec (75.10 MB/sec), 406.0 ms avg latency, 1833.0 ms max latency. 387652 records sent, 77483.9 records/sec (75.67 MB/sec), 397.3 ms avg latency, 1927.0 ms max latency. 343286 records sent, 68629.7 records/sec (67.02 MB/sec), 455.6 ms avg latency, 1685.0 ms max latency. 00 records sent, 66646.7 records/sec (65.08 MB/sec), 456.6 ms avg latency, 2146.0 ms max latency. 361191 records sent, 72238.2 records/sec (70.55 MB/sec), 409.4 ms avg latency, 2125.0 ms max latency. 357525 records sent, 71490.7 records/sec (69.82 MB/sec), 436.0 ms avg latency, 1502.0 ms max latency. 340238 records sent, 68047.6 records/sec (66.45 MB/sec), 427.9 ms avg latency, 1932.0 ms max latency. 390016 records sent, 77956.4 records/sec (76.13 MB/sec), 418.5 ms avg latency, 1807.0 ms max latency. 352830 records sent, 70523.7 records/sec (68.87 MB/sec), 439.4 ms avg latency, 1892.0 ms max latency. 354526 records sent, 70905.2 records/sec (69.24 MB/sec), 429.6 ms avg latency, 2128.0 ms max latency. 356670 records sent, 71305.5 records/sec (69.63 MB/sec), 408.9 ms avg latency, 1329.0 ms max latency. 309204 records sent, 60687.7 records/sec (59.27 MB/sec), 438.6 ms avg latency, 2566.0 ms max latency. 366715 records sent, 72316.1 records/sec (70.62 MB/sec), 474.5 ms avg latency, 2169.0 ms max latency. 375174 records sent, 75034.8 records/sec (73.28 MB/sec), 429.9 ms avg latency, 1722.0 ms max latency. 359400 records sent, 70346.4 records/sec (68.70 MB/sec), 432.1 ms avg latency, 1961.0 ms max latency. 312276 records sent, 62430.2 records/sec (60.97 MB/sec), 477.4 ms avg latency, 2006.0 ms max latency. 361875 records sent, 72360.5 records/sec (70.66 MB/sec),
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this reason, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` The default onPartitionsLost implementation is there to cover the case where partitions are reassigned before we have a chance to revoke them gracefully (i.e. in case of session timeout). Maybe we can leave the override adding an appropriate comment. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this reason, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` The default onPartitionsLost implementation is there to cover the case where partitions are reassigned before we have a chance to revoke them gracefully (i.e. in case of session timeout).Maybe we can leave it adding an appropriate comment. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this readon, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From the javadoc we have: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` I guess the default onPartitionsLost implementation is there to cover some corner case where onPartitionsRevoked may not be triggered, but we may still need to do some other cleanup. Is that correct? Maybe we can leave it adding an appropriate comment. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim opened a new pull request, #14122: MINOR: GroupMetadataManagerTest.java style fix
jeffkbkim opened a new pull request, #14122: URL: https://github.com/apache/kafka/pull/14122 This patch makes the styling consistent inside GroupMetadataManagerTest.java. Also, it adds `JoinResult` to simplify the JoinGroup API responses in the tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277749103 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: Hi @dajac, the default implementation of onPartitionsLost calls onPartitionsRevoked so that any cleanup logic can be executed (i.e. commit pending offsets before losing the partition ownership). This also means that "revoked" is logged instead of "lost". https://github.com/fvaleri/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L197-L199 For this readon, I thought we could simply use the default implementation without overriding, but now I see that it is too late to save the offsets when onPartitionsLost is called, since these partitions are probably owned by other consumers already. From ConsumerRebalanceListener javadoc: ```sh public void onPartitionsLost(Collection partitions) { // do not need to save the offsets since these partitions are probably owned by other consumers already } ``` I guess the default onPartitionsLost implementation is there to cover some corner case where onPartitionsRevoked may not be triggered, but we may still need to do some other cleanup. Is that correct? Maybe we can leave it adding an appropriate comment. Wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #14086: MINOR: Test assign() and assignment() in the integration test
junrao merged PR #14086: URL: https://github.com/apache/kafka/pull/14086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #14117: MINOR: Code cleanups in group-coordinator module
jolshan commented on code in PR #14117: URL: https://github.com/apache/kafka/pull/14117#discussion_r1277712647 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1014,7 +1013,7 @@ public void onHighWatermarkUpdated( * @param processor The event processor. * @param partitionWriter The partition writer. * @param loaderThe coordinator loader. - * @param coordinatorBuilderSupplierThe coordinator builder. + * @param coordinatorShardBuilderSupplierThe coordinator builder. Review Comment: nit: this changed the spacing of the params. Do we want to line them up? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
dajac commented on code in PR #14121: URL: https://github.com/apache/kafka/pull/14121#discussion_r1277683057 ## examples/src/main/java/kafka/examples/Consumer.java: ## @@ -157,9 +157,4 @@ public void onPartitionsRevoked(Collection partitions) { public void onPartitionsAssigned(Collection partitions) { Utils.printOut("Assigned partitions: %s", partitions); } - -@Override -public void onPartitionsLost(Collection partitions) { -Utils.printOut("Lost partitions: %s", partitions); Review Comment: I don't really understand the motivation of this change. It seems to me that we are losing the log if we do so. Could you elaborate more on the motivation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15271) TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap
[ https://issues.apache.org/jira/browse/KAFKA-15271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-15271: Description: Example: {code:java} @Test public void bug() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); // Topic -> Partition -> Offset TimelineHashMap> offsets = new TimelineHashMap<>(snapshotRegistry, 0); snapshotRegistry.getOrCreateSnapshot(0); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(0, 100L); snapshotRegistry.getOrCreateSnapshot(1); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(1, 110L); snapshotRegistry.getOrCreateSnapshot(2); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(1, 111L); assertNull(offsets.get("foo", 1).get(1, 1)); offsets.entrySet(1).forEach(topicEntry -> { System.out.println(topicEntry.getKey()); topicEntry.getValue().entrySet(1).forEach(partitionEntry -> { System.out.println(partitionEntry.getKey() + " : " + partitionEntry.getValue()); }); }); /* The above code prints: foo 0 : 100 1 : 110 but should rather print: foo 0 : 100 */ } {code} It yields the expected result when the third put is removed. `get(key, epoch)` is always correct as well. It seems that `entrySet` has an issue. was: Example: {code:java} @Test public void bug() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); // Topic -> Partition -> Offset TimelineHashMap> offsets = new TimelineHashMap<>(snapshotRegistry, 0); snapshotRegistry.getOrCreateSnapshot(0); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(0, 100L); snapshotRegistry.getOrCreateSnapshot(1); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(1, 110L); snapshotRegistry.getOrCreateSnapshot(2); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(1, 111L); assertNull(offsets.get("foo", 1).get(1, 1)); offsets.entrySet(1).forEach(topicEntry -> { System.out.println(topicEntry.getKey()); topicEntry.getValue().entrySet(1).forEach(partitionEntry -> { System.out.println(partitionEntry.getKey() + " : " + partitionEntry.getValue()); }); }); /* The above code prints: foo 0 : 100 1 : 110 but should rather print: foo 0 : 100 */ } {code} It yields the expected result when the third put is removed. > TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap > - > > Key: KAFKA-15271 > URL: https://issues.apache.org/jira/browse/KAFKA-15271 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Priority: Major > > Example: > {code:java} > @Test > public void bug() { > SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new > LogContext()); > // Topic -> Partition -> Offset > TimelineHashMap> offsets = > new TimelineHashMap<>(snapshotRegistry, 0); > snapshotRegistry.getOrCreateSnapshot(0); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(0, 100L); > snapshotRegistry.getOrCreateSnapshot(1); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(1, 110L); > snapshotRegistry.getOrCreateSnapshot(2); > offsets > .computeIfAbsent("foo", __ -> new > TimelineHashMap<>(snapshotRegistry, 0)) > .put(1, 111L); > assertNull(offsets.get("foo", 1).get(1, 1)); > offsets.entrySet(1).forEach(topicEntry -> { > System.out.println(topicEntry.getKey()); > topicEntry.getValue().entrySet(1).forEach(partitionEntry -> { > System.out.println(partitionEntry.getKey() + " : " + > partitionEntry.getValue()); > }); > }); > /* > The above code prints: > foo > 0 : 100 > 1 : 110 > but should rather print:
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1655850239 Zoning in on ControllerIntegrationTest#testControllerMoveOnTopicCreation: ``` ... val event = appender.getMessages.find(e => e.getLevel == Level.INFO && e.getThrowableInformation != null && e.getThrowableInformation.getThrowable.getClass.getName.equals(classOf[ControllerMovedException].getName)) assertTrue(event.isDefined) <- THIS IS THE ASSERTION WHICH FAILS ... ``` However from the test logs: ``` 16:03:02.258 [controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=0] Controller moved to another broker when processing TopicChange. org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15271) TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap
David Jacot created KAFKA-15271: --- Summary: TimelineHashMap.entrySet yield unexpected results with nested TimelineHashMap Key: KAFKA-15271 URL: https://issues.apache.org/jira/browse/KAFKA-15271 Project: Kafka Issue Type: Bug Reporter: David Jacot Example: {code:java} @Test public void bug() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); // Topic -> Partition -> Offset TimelineHashMap> offsets = new TimelineHashMap<>(snapshotRegistry, 0); snapshotRegistry.getOrCreateSnapshot(0); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(0, 100L); snapshotRegistry.getOrCreateSnapshot(1); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(1, 110L); snapshotRegistry.getOrCreateSnapshot(2); offsets .computeIfAbsent("foo", __ -> new TimelineHashMap<>(snapshotRegistry, 0)) .put(1, 111L); assertNull(offsets.get("foo", 1).get(1, 1)); offsets.entrySet(1).forEach(topicEntry -> { System.out.println(topicEntry.getKey()); topicEntry.getValue().entrySet(1).forEach(partitionEntry -> { System.out.println(partitionEntry.getKey() + " : " + partitionEntry.getValue()); }); }); /* The above code prints: foo 0 : 100 1 : 110 but should rather print: foo 0 : 100 */ } {code} It yields the expected result when the third put is removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case
[ https://issues.apache.org/jira/browse/KAFKA-15270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-15270: -- Assignee: Lianet Magrans > Integration tests for AsyncConsumer simple consume case > --- > > Key: KAFKA-15270 > URL: https://issues.apache.org/jira/browse/KAFKA-15270 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > > This task involves writing integration tests for covering the simple consume > functionality of the AsyncConsumer. This should include validation of the > assign, fetch and positions logic. > Not covering any committed offset functionality as part of this task. > Integration tests should have a similar form as the existing > PlaintextConsumerTest, but scoped to the simple consume flow. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case
Lianet Magrans created KAFKA-15270: -- Summary: Integration tests for AsyncConsumer simple consume case Key: KAFKA-15270 URL: https://issues.apache.org/jira/browse/KAFKA-15270 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans This task involves writing integration tests for covering the simple consume functionality of the AsyncConsumer. This should include validation of the assign, fetch and positions logic. Not covering any committed offset functionality as part of this task. Integration tests should have a similar form as the existing PlaintextConsumerTest, but scoped to the simple consume flow. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1655821383 At least on a first look it appears something changed/is wrong with the loggers: ``` [2023-02-22T17:35:34.655Z] Gradle Test Run :core:unitTest > Gradle Test Executor 136 > ConfigAdminManagerTest > testPreprocessIncrementalWithLoggerChanges() FAILED [2023-02-22T17:35:34.655Z] org.opentest4j.AssertionFailedError: expected: <{AlterConfigsResource(resourceType=8, resourceName='1', configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', configOperation=0, value='INFO')])=ApiError(error=NONE, message=null)}> but was: <{AlterConfigsResource(resourceType=8, resourceName='1', configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', configOperation=0, value='INFO')])=ApiError(error=INVALID_CONFIG, message=Logger kafka.server.ConfigAdminManagerTest does not exist!)}> [2023-02-22T17:35:34.655Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) [2023-02-22T17:35:34.655Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) [2023-02-22T17:35:34.655Z] at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) [2023-02-22T17:35:34.655Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) [2023-02-22T17:35:34.655Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) [2023-02-22T17:35:34.655Z] at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142) [2023-02-22T17:35:34.656Z] at app//kafka.server.ConfigAdminManagerTest.testPreprocessIncrementalWithLoggerChanges(ConfigAdminManagerTest.scala:352) ... [2023-02-22T17:35:34.656Z] Gradle Test Run :core:unitTest > Gradle Test Executor 136 > ConfigAdminManagerTest > testValidateLogLevelConfigs() FAILED [2023-02-22T17:35:34.656Z] org.apache.kafka.common.errors.InvalidConfigurationException: Logger kafka.server.ConfigAdminManagerTest does not exist! ... [2023-02-22T17:36:10.045Z] Gradle Test Run :core:unitTest > Gradle Test Executor 136 > LoggingTest > testLoggerLevelIsResolved() FAILED [2023-02-22T17:36:10.045Z] org.opentest4j.AssertionFailedError: expected: but was: [2023-02-22T17:36:10.045Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) [2023-02-22T17:36:10.045Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) [2023-02-22T17:36:10.045Z] at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) [2023-02-22T17:36:10.045Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) [2023-02-22T17:36:10.045Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) [2023-02-22T17:36:10.045Z] at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142) [2023-02-22T17:36:10.045Z] at app//kafka.utils.LoggingTest.testLoggerLevelIsResolved(LoggingTest.scala:81) ... [2023-02-22T17:37:30.427Z] Gradle Test Run :core:unitTest > Gradle Test Executor 173 > ConfigAdminManagerTest > testPreprocessIncrementalWithLoggerChanges() FAILED [2023-02-22T17:37:30.427Z] org.opentest4j.AssertionFailedError: expected: <{AlterConfigsResource(resourceType=8, resourceName='1', configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', configOperation=0, value='INFO')])=ApiError(error=NONE, message=null)}> but was: <{AlterConfigsResource(resourceType=8, resourceName='1', configs=[AlterableConfig(name='kafka.server.ConfigAdminManagerTest', configOperation=0, value='INFO')])=ApiError(error=INVALID_CONFIG, message=Logger kafka.server.ConfigAdminManagerTest does not exist!)}> [2023-02-22T17:37:30.427Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) [2023-02-22T17:37:30.427Z] at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) [2023-02-22T17:37:30.427Z] at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) [2023-02-22T17:37:30.427Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) [2023-02-22T17:37:30.427Z] at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) [2023-02-22T17:37:30.427Z] at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1142) [2023-02-22T17:37:30.427Z] at app//kafka.server.ConfigAdminManagerTest.testPreprocessIncrementalWithLoggerChanges(ConfigAdminManagerTest.scala:352) ... [2023-02-22T17:37:30.427Z] Gradle Test Run :core:unitTest > Gradle Test Ex
[jira] [Created] (KAFKA-15269) Clean up the RaftClient interface
José Armando García Sancio created KAFKA-15269: -- Summary: Clean up the RaftClient interface Key: KAFKA-15269 URL: https://issues.apache.org/jira/browse/KAFKA-15269 Project: Kafka Issue Type: Task Reporter: José Armando García Sancio Assignee: José Armando García Sancio Make the following changes to the {{RaftClient}} interface and implementation Remove {{scheduleAtomicAppend;}} the controller doesn't use {{scheduleAppend}} so we can revert to the original semantic. {{logEndOffset}} is misleading when called on the leader since it doesn't include records already appended to the {{BatchAccumulator}} and have not been written to the log. Rename it to {{endOffset}} in the process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13260: KAFKA-14661: Upgrade Zookeeper to 3.8.1
clolov commented on PR #13260: URL: https://github.com/apache/kafka/pull/13260#issuecomment-1655753598 At least the below tests have started to continuously fail. I am still looking as to why: ``` Gradle Test Run :core:test > Gradle Test Executor 3 > ControllerIntegrationTest > testControllerMoveOnPreferredReplicaElection() FAILED org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) at app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnPreferredReplicaElection(ControllerIntegrationTest.scala:1885) Gradle Test Run :core:test > Gradle Test Executor 3 > ControllerIntegrationTest > testControllerMoveOnTopicCreation() FAILED org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) at app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnTopicCreation(ControllerIntegrationTest.scala:1885) Gradle Test Run :core:test > Gradle Test Executor 3 > ControllerIntegrationTest > testControllerMoveOnPartitionReassignment() FAILED org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) at app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnPartitionReassignment(ControllerIntegrationTest.scala:1885) Gradle Test Run :core:test > Gradle Test Executor 3 > ControllerIntegrationTest > testControllerMoveOnTopicDeletion() FAILED org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:180) at app//kafka.controller.ControllerIntegrationTest.testControllerMoveOnTopicDeletion(ControllerIntegrationTest.scala:1885) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15089) Consolidate all the group coordinator configs
[ https://issues.apache.org/jira/browse/KAFKA-15089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-15089: --- Assignee: (was: David Jacot) > Consolidate all the group coordinator configs > - > > Key: KAFKA-15089 > URL: https://issues.apache.org/jira/browse/KAFKA-15089 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > The group coordinator configurations are defined in KafkaConfig at the > moment. As KafkaConfig is defined in the core module, we can't pass it to the > new java modules to pass the configurations along. > A suggestion here is to centralize all the configurations of a module in the > module itself similarly to what we have do for RemoteLogManagerConfig and > RaftConfig. We also need a mechanism to add all the properties defined in the > module to the KafkaConfig's ConfigDef. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15225) Define constants for record types
[ https://issues.apache.org/jira/browse/KAFKA-15225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-15225: --- Assignee: (was: David Jacot) > Define constants for record types > - > > Key: KAFKA-15225 > URL: https://issues.apache.org/jira/browse/KAFKA-15225 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > Define constants for all the record types. Ideally, this should be defined in > the record definitions and the constants should be auto-generated (e.g. like > ApiKeys). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15061) CoordinatorPartitionWriter should reuse buffer
[ https://issues.apache.org/jira/browse/KAFKA-15061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-15061: --- Assignee: (was: David Jacot) > CoordinatorPartitionWriter should reuse buffer > -- > > Key: KAFKA-15061 > URL: https://issues.apache.org/jira/browse/KAFKA-15061 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15225) Define constants for record types
[ https://issues.apache.org/jira/browse/KAFKA-15225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-15225: Priority: Minor (was: Major) > Define constants for record types > - > > Key: KAFKA-15225 > URL: https://issues.apache.org/jira/browse/KAFKA-15225 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Minor > > Define constants for all the record types. Ideally, this should be defined in > the record definitions and the constants should be auto-generated (e.g. like > ApiKeys). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15268) Consider replacing Subscription Metadata by a hash
David Jacot created KAFKA-15268: --- Summary: Consider replacing Subscription Metadata by a hash Key: KAFKA-15268 URL: https://issues.apache.org/jira/browse/KAFKA-15268 Project: Kafka Issue Type: Sub-task Reporter: David Jacot With the addition of the racks, the subscription metadata record is getting large, too large in my opinion. We should consider replacing it with an hash. The subscription metadata is mainly used to detect changes in metadata. A hash would give a similar functionality. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748614#comment-17748614 ] David Jacot commented on KAFKA-14509: - [~riedelmax] I left a few comments in your comment. > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14509) Add ConsumerGroupDescribe API
[ https://issues.apache.org/jira/browse/KAFKA-14509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748613#comment-17748613 ] David Jacot commented on KAFKA-14509: - [~riedelmax] There are no error codes in requests so I am not sure what you are referring to. Could you elaborate? > Add ConsumerGroupDescribe API > - > > Key: KAFKA-14509 > URL: https://issues.apache.org/jira/browse/KAFKA-14509 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Max Riedel >Priority: Major > > The goal of this task is to implement the ConsumerGroupDescribe API as > described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupDescribeAPI]; > and to implement the related changes in the admin client as described > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Admin#describeConsumerGroups]. > On the server side, this mainly requires the following steps: > # The request/response schemas must be defined (see > ListGroupsRequest/Response.json for an example); > # Request/response classes must be defined (see > ListGroupsRequest/Response.java for an example); > # The API must be defined in KafkaApis (see > KafkaApis#handleDescribeGroupsRequest for an example); > # The GroupCoordinator interface (java file) must be extended for the new > operations. > # The new operation must be implemented in GroupCoordinatorService (new > coordinator in Java) whereas the GroupCoordinatorAdapter (old coordinator in > Scala) should just reject the request. > We could probably do 1) and 2) in one pull request and the remaining ones in > another. > On the admin client side, this mainly requires the followings steps: > * Define all the new java classes as defined in the KIP. > * Add the new API to KafkaAdminClient class. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14501) Implement Heartbeat API
[ https://issues.apache.org/jira/browse/KAFKA-14501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-14501. - Fix Version/s: 3.6.0 Reviewer: David Jacot Resolution: Fixed > Implement Heartbeat API > --- > > Key: KAFKA-14501 > URL: https://issues.apache.org/jira/browse/KAFKA-14501 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Jeff Kim >Priority: Major > Fix For: 3.6.0 > > > Implement Heartbeat API in the new Group Coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac merged pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
dajac merged PR #14056: URL: https://github.com/apache/kafka/pull/14056 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri opened a new pull request, #14121: MINOR: Remove onPartitionsLost overriding in favor of default implementation
fvaleri opened a new pull request, #14121: URL: https://github.com/apache/kafka/pull/14121 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #14056: KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator
dajac commented on code in PR #14056: URL: https://github.com/apache/kafka/pull/14056#discussion_r1277524963 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -843,15 +886,19 @@ public JoinGroupResponseData setupGroupWithPendingMember(GenericGroup group) thr // Start the join for the second member CompletableFuture followerJoinFuture = new CompletableFuture<>(); CoordinatorResult result = sendGenericGroupJoin( -joinRequest.setMemberId(UNKNOWN_MEMBER_ID), +joinRequest +.setMemberId(UNKNOWN_MEMBER_ID), followerJoinFuture ); assertTrue(result.records().isEmpty()); assertFalse(followerJoinFuture.isDone()); CompletableFuture leaderJoinFuture = new CompletableFuture<>(); -result = sendGenericGroupJoin(joinRequest.setMemberId(leaderId), leaderJoinFuture); +result = sendGenericGroupJoin( +joinRequest +.setMemberId(leaderJoinResponse.memberId()), Review Comment: nit: When there is only one setter, it can stay on the same line. There are many cases in this file. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5837,15 +6007,15 @@ public void testNewMemberTimeoutCompletion() throws Exception { .build(); GenericGroup group = context.createGenericGroup("group-id"); -JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() +CompletableFuture joinFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(new JoinGroupRequestBuilder() Review Comment: nit: I would be better to format it as follow: ``` CoordinatorResult result = context.sendGenericGroupJoin( new JoinGroupRequestBuilder() .withGroupId("group-id") .withMemberId(UNKNOWN_MEMBER_ID) .withDefaultProtocolTypeAndProtocols() .withSessionTimeoutMs(context.genericGroupNewMemberJoinTimeoutMs + 5000) .build(), joinFuture ); ``` ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -843,15 +886,19 @@ public JoinGroupResponseData setupGroupWithPendingMember(GenericGroup group) thr // Start the join for the second member CompletableFuture followerJoinFuture = new CompletableFuture<>(); CoordinatorResult result = sendGenericGroupJoin( -joinRequest.setMemberId(UNKNOWN_MEMBER_ID), +joinRequest +.setMemberId(UNKNOWN_MEMBER_ID), followerJoinFuture ); assertTrue(result.records().isEmpty()); assertFalse(followerJoinFuture.isDone()); CompletableFuture leaderJoinFuture = new CompletableFuture<>(); -result = sendGenericGroupJoin(joinRequest.setMemberId(leaderId), leaderJoinFuture); +result = sendGenericGroupJoin( +joinRequest +.setMemberId(leaderJoinResponse.memberId()), +leaderJoinFuture); Review Comment: nit: I have noticed that we are not consistent about where we put the closing parenthesis. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -5837,15 +6007,15 @@ public void testNewMemberTimeoutCompletion() throws Exception { .build(); GenericGroup group = context.createGenericGroup("group-id"); -JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder() +CompletableFuture joinFuture = new CompletableFuture<>(); +CoordinatorResult result = context.sendGenericGroupJoin(new JoinGroupRequestBuilder() Review Comment: This code will go away when you change to using `JoinResult`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-15267: -- Labels: tiered-storage (was: ) > Cluster-wide disablement of Tiered Storage > -- > > Key: KAFKA-15267 > URL: https://issues.apache.org/jira/browse/KAFKA-15267 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > Labels: tiered-storage > > h2. Summary > KIP-405 defines the configuration {{remote.log.storage.system.enable}} which > controls whether all resources needed for Tiered Storage to function are > instantiated properly in Kafka. However, the interaction between remote data > and Kafka if that configuration is set to false while there are still topics > with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would > like to give customers the ability to switch off Tiered Storage on a cluster > level and as such would need to define the behaviour.*{color} > {{remote.log.storage.system.enable}} is a read-only configuration. This means > that it can only be changed by *modifying the server.properties* and > restarting brokers. As such, the {*}validity of values contained in it is > only checked at broker startup{*}. > This JIRA proposes a few behaviours and a recommendation on a way forward. > h2. Option 1: Change nothing > Pros: > * No operation. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster > level and do not allow it to be disabled > Always instantiate all resources for tiered storage. If no special ones are > selected use the default ones which come with Kafka. > Pros: > * We solve the problem for moving between versions not allowing TS to be > disabled. > Cons: > * We do not solve the problem of moving back to older (or newer) Kafka > versions not supporting TS. > * We haven’t quantified how much computer resources (CPU, memory) idle TS > components occupy. > * TS is a feature not required for running Kafka. As such, while it is still > under development we shouldn’t put it on the critical path of starting a > broker. In this way, a stray memory leak won’t impact anything on the > critical path of a broker. > * We are potentially swapping one problem for another. How does TS behave if > one decides to swap the TS plugin classes when data has already been written? > h2. Option 3: Hide topics with tiering enabled > Customers cannot interact with topics which have tiering enabled. They cannot > create new topics with the same names. Retention (and compaction?) do not > take effect on files already in local storage. > Pros: > * We do not force data-deletion. > Cons: > * This will be quite involved - the controller will need to know when a > broker’s server.properties have been altered; the broker will need to not > proceed to delete logs it is not the leader or follower for. > h2. {color:#00875a}Option 4: Do not start the broker if there are topics with > tiering enabled{color} - Recommended > This option has 2 different sub-options. The first one is that TS cannot be > disabled on cluster-level if there are *any* tiering topics - in other words > all tiered topics need to be deleted. The second one is that TS cannot be > disabled on a cluster-level if there are *any* topics with *tiering enabled* > - they can have tiering disabled, but with a retention policy set to delete > or retain (as per > [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). > A topic can have tiering disabled and remain on the cluster as long as there > is no *remote* data when TS is disabled cluster-wide. > Pros: > * We force the customer to be very explicit in disabling tiering of topics > prior to disabling TS on the whole cluster. > Cons: > * You have to make certain that all data in remote is deleted (just a > disablement of tired topic is not enough). How do you determine whether all > remote has expired if policy is retain? If retain policy in KIP-950 knows > that there is data in remote then this should also be able to figure it out. > The common denominator is that there needs to be no *remote* data at the > point of disabling TS. As such, the most straightforward option is to refuse > to start brokers if there are topics with the {{remote.storage.enabled}} > present. This in essence requires customers to clean any tiered topics before > switching off TS, which is a fair ask. Should we wish to revise this later it > should be possible. > h2. Option 5: Make Kafka forget about all remote information > Pros: > * Clean cut > Cons: > * Data is lost the moment TS is disabled rega
[jira] [Updated] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
[ https://issues.apache.org/jira/browse/KAFKA-15267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-15267: -- Description: h2. Summary KIP-405 defines the configuration {{remote.log.storage.system.enable}} which controls whether all resources needed for Tiered Storage to function are instantiated properly in Kafka. However, the interaction between remote data and Kafka if that configuration is set to false while there are still topics with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like to give customers the ability to switch off Tiered Storage on a cluster level and as such would need to define the behaviour.*{color} {{remote.log.storage.system.enable}} is a read-only configuration. This means that it can only be changed by *modifying the server.properties* and restarting brokers. As such, the {*}validity of values contained in it is only checked at broker startup{*}. This JIRA proposes a few behaviours and a recommendation on a way forward. h2. Option 1: Change nothing Pros: * No operation. Cons: * We do not solve the problem of moving back to older (or newer) Kafka versions not supporting TS. h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster level and do not allow it to be disabled Always instantiate all resources for tiered storage. If no special ones are selected use the default ones which come with Kafka. Pros: * We solve the problem for moving between versions not allowing TS to be disabled. Cons: * We do not solve the problem of moving back to older (or newer) Kafka versions not supporting TS. * We haven’t quantified how much computer resources (CPU, memory) idle TS components occupy. * TS is a feature not required for running Kafka. As such, while it is still under development we shouldn’t put it on the critical path of starting a broker. In this way, a stray memory leak won’t impact anything on the critical path of a broker. * We are potentially swapping one problem for another. How does TS behave if one decides to swap the TS plugin classes when data has already been written? h2. Option 3: Hide topics with tiering enabled Customers cannot interact with topics which have tiering enabled. They cannot create new topics with the same names. Retention (and compaction?) do not take effect on files already in local storage. Pros: * We do not force data-deletion. Cons: * This will be quite involved - the controller will need to know when a broker’s server.properties have been altered; the broker will need to not proceed to delete logs it is not the leader or follower for. h2. {color:#00875a}Option 4: Do not start the broker if there are topics with tiering enabled{color} - Recommended This option has 2 different sub-options. The first one is that TS cannot be disabled on cluster-level if there are *any* tiering topics - in other words all tiered topics need to be deleted. The second one is that TS cannot be disabled on a cluster-level if there are *any* topics with *tiering enabled* - they can have tiering disabled, but with a retention policy set to delete or retain (as per [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). A topic can have tiering disabled and remain on the cluster as long as there is no *remote* data when TS is disabled cluster-wide. Pros: * We force the customer to be very explicit in disabling tiering of topics prior to disabling TS on the whole cluster. Cons: * You have to make certain that all data in remote is deleted (just a disablement of tired topic is not enough). How do you determine whether all remote has expired if policy is retain? If retain policy in KIP-950 knows that there is data in remote then this should also be able to figure it out. The common denominator is that there needs to be no *remote* data at the point of disabling TS. As such, the most straightforward option is to refuse to start brokers if there are topics with the {{remote.storage.enabled}} present. This in essence requires customers to clean any tiered topics before switching off TS, which is a fair ask. Should we wish to revise this later it should be possible. h2. Option 5: Make Kafka forget about all remote information Pros: * Clean cut Cons: * Data is lost the moment TS is disabled regardless of whether it is reenabled later on, which might not be the behaviour expected by customers. was: h2. Summary KIP-405 defines the configuration {{remote.log.storage.system.enable}} which controls whether all resources needed for Tiered Storage to function are instantiated properly in Kafka. However, the interaction between remote data and Kafka if that configuration is set to false while there are still topics with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like to give customers the ability to switch off Tie
[jira] [Created] (KAFKA-15267) Cluster-wide disablement of Tiered Storage
Christo Lolov created KAFKA-15267: - Summary: Cluster-wide disablement of Tiered Storage Key: KAFKA-15267 URL: https://issues.apache.org/jira/browse/KAFKA-15267 Project: Kafka Issue Type: Sub-task Reporter: Christo Lolov Assignee: Christo Lolov h2. Summary KIP-405 defines the configuration {{remote.log.storage.system.enable}} which controls whether all resources needed for Tiered Storage to function are instantiated properly in Kafka. However, the interaction between remote data and Kafka if that configuration is set to false while there are still topics with {{{}remote.storage.enable is undefined{}}}. {color:#ff8b00}*We would like to give customers the ability to switch off Tiered Storage on a cluster level and as such would need to define the behaviour.*{color} {{remote.log.storage.system.enable}} is a read-only configuration. This means that it can only be changed by *modifying the server.properties* and restarting brokers. As such, the {*}validity of values contained in it is only checked at broker startup{*}. This JIRA proposes a few behaviours and a recommendation on a way forward. h2. Option 1: Change nothing Pros: * No operation. Cons: * We do not solve the problem of moving back to older (or newer) Kafka versions not supporting TS. h2. Option 2: Remove the configuration, enable Tiered Storage on a cluster level and do not allow it to be disabled Always instantiate all resources for tiered storage. If no special ones are selected use the default ones which come with Kafka. Pros: * We solve the problem for moving between versions not allowing TS to be disabled. Cons: * We do not solve the problem of moving back to older (or newer) Kafka versions not supporting TS. * We haven’t quantified how much computer resources (CPU, memory) idle TS components occupy. * TS is a feature not required for running Kafka. As such, while it is still under development we shouldn’t put it on the critical path of starting a broker. In this way, a stray memory leak won’t impact anything on the critical path of a broker. * We are potentially swapping one problem for another. How does TS behave if one decides to swap the TS plugin classes when data has already been written? h2. Option 3: Hide topics with tiering enabled Customers cannot interact with topics which have tiering enabled. They cannot create new topics with the same names. Retention (and compaction?) do not take effect on files already in local storage. Pros: * We do not force data-deletion. Cons: * This will be quite involved - the controller will need to know when a broker’s server.properties have been altered; the broker will need to not proceed to delete logs it is not the leader or follower for. h2. {color:#e6e6e6}Option 4: Do not start the broker if there are topics with tiering enabled{color} - Recommended This option has 2 different sub-options. The first one is that TS cannot be disabled on cluster-level if there are *any* tiering topics - in other words all tiered topics need to be deleted. The second one is that TS cannot be disabled on a cluster-level if there are *any* topics with *tiering enabled* - they can have tiering disabled, but with a retention policy set to delete or retain (as per [KIP-950|https://cwiki.apache.org/confluence/display/KAFKA/KIP-950%3A++Tiered+Storage+Disablement]). A topic can have tiering disabled and remain on the cluster as long as there is no *remote* data when TS is disabled cluster-wide. Pros: * We force the customer to be very explicit in disabling tiering of topics prior to disabling TS on the whole cluster. Cons: * You have to make certain that all data in remote is deleted (just a disablement of tired topic is not enough). How do you determine whether all remote has expired if policy is retain? If retain policy in KIP-950 knows that there is data in remote then this should also be able to figure it out. The common denominator is that there needs to be no *remote* data at the point of disabling TS. As such, the most straightforward option is to refuse to start brokers if there are topics with the {{remote.storage.enabled}} present. This in essence requires customers to clean any tiered topics before switching off TS, which is a fair ask. Should we wish to revise this later it should be possible. h2. Option 5: Make Kafka forget about all remote information Pros: * Clean cut Cons: * Data is lost the moment TS is disabled regardless of whether it is reenabled later on, which might not be the behaviour expected by customers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #14117: MINOR: Code cleanups in group-coordinator module
dajac commented on code in PR #14117: URL: https://github.com/apache/kafka/pull/14117#discussion_r1277523528 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java: ## @@ -17,14 +17,14 @@ package org.apache.kafka.coordinator.group.runtime; /** - * Supplies a {@link CoordinatorBuilder} to the {@link CoordinatorRuntime}. + * Supplies a {@link CoordinatorShardBuilder} to the {@link CoordinatorRuntime}. * * @param The type of the coordinator. * @param The record type. */ -public interface CoordinatorBuilderSupplier, U> { +public interface CoordinatorBuilderSupplier, U> { Review Comment: yeah, that makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #14117: MINOR: Code cleanups in group-coordinator module
jeffkbkim commented on code in PR #14117: URL: https://github.com/apache/kafka/pull/14117#discussion_r1277519026 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorBuilderSupplier.java: ## @@ -17,14 +17,14 @@ package org.apache.kafka.coordinator.group.runtime; /** - * Supplies a {@link CoordinatorBuilder} to the {@link CoordinatorRuntime}. + * Supplies a {@link CoordinatorShardBuilder} to the {@link CoordinatorRuntime}. * * @param The type of the coordinator. * @param The record type. */ -public interface CoordinatorBuilderSupplier, U> { +public interface CoordinatorBuilderSupplier, U> { Review Comment: should this be CoordinatorShardBuilderSupplier? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15246) CoordinatorContext should be protected by a lock
[ https://issues.apache.org/jira/browse/KAFKA-15246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15246. - Fix Version/s: 3.6.0 Reviewer: Justine Olshan Resolution: Fixed > CoordinatorContext should be protected by a lock > > > Key: KAFKA-15246 > URL: https://issues.apache.org/jira/browse/KAFKA-15246 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.6.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac opened a new pull request, #14120: KAFKA-14499: [4/N] Implement OffsetFetch API
dajac opened a new pull request, #14120: URL: https://github.com/apache/kafka/pull/14120 This patch implements the OffsetFetch API in the new group coordinator. I found out that implementing the `RequireStable` flag is hard (to not say impossible) in the current model. For the context, the flag is here to ensure that an OffsetRequest request does not return stale offsets if there are pending offsets to be committed. In the scala code, we basically check the pending offsets data structure and if they are any pending offsets, we return the `UNSTABLE_OFFSET_COMMIT` error. This tells the consumer to retry. In our new model, we don't have the pending offsets data structure. Instead, we use a timeline data structure to handle all the pending/uncommitted changes. Because of this we don't know whether offsets are pending for a particular group. Instead of doing this, I propose to not return the `UNSTABLE_OFFSET_COMMIT` error anymore. Instead, when `RequireStable` is set, we use a write operation to ensure that we read the latest offsets. If they are uncommitted offsets, the write operation ensures that the response is only return when they are committed. This gives a similar behaviour in the end. Note that this PR does not adds the MemberId and MemberEpoch fields to the request. This will be done in the next PR. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] amangandhi94 opened a new pull request, #14119: KAFKA-15266: Static configs set for non primary synonyms are ignored for Log configs
amangandhi94 opened a new pull request, #14119: URL: https://github.com/apache/kafka/pull/14119 This change ensures that values stored in "non-primary" synonyms is respected when altering configs. Currently if "non-primary" synonyms have a static config set up then it gets deleted from the default log config during reconfiguration. We want to ensure that we retain the correct order of using broker level synonyms(log.retention.ms > log.retention.minutes > log.retention.hours) I have added unit test cases for this scenario. 1. During setup we configure log.roll.hours=24 2. We then run a few alter configs and ensure that LogConfig for segment.ms is still set to 24 hours 3. We then run alter-config to set log.roll.ms=5 and we verify that segment.ms also changes to 5 4. We then delete the dynamic config for log.roll.ms and then verify that we again pick up the statically configured 24 hour value ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15259) Kafka Streams does not continue processing due to rollback despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once
[ https://issues.apache.org/jira/browse/KAFKA-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17748555#comment-17748555 ] Tomonari Yamashita commented on KAFKA-15259: Hi [~mjsax], I think the stacktrace can be ignored. This problem cause is that sending a too large message to "output-topic", then RecordTooLargeException occurs in producer.send(), and the transaction would be rollbacked/aborted (3) despite ProductionExceptionHandlerResponse.CONTINUE if using execute_once. The stacktrace is just secondary disaster with an error caused by a subsequent commit on the transaction that has already been rollbacked/aborted. As far as we have tested, even using the same source code (attached file: Reproducer.java), changing the version of Kafka Streams will lead to a different behavior whether "continue" is available or not (i.e., the transaction aborts), as follows: - Kafka 2.6.3 : OK. "continue" worked as I assume. - Kafka 2.8.2 : OK. "continue" worked as I assume. - Kafka 3.0.0 : OK. "continue" worked as I assume. - Kafka 3.1.2 : OK. "continue" worked as I assume. - Kafka 3.2.0 : NG. "continue" didn't work because the transaction was rollbacked/aborted. - Kafka 3.2.3 : NG. "continue" didn't work because the transaction was rollbacked/aborted. - Kafka 3.5.1 : NG. "continue" didn't work because the transaction was rollbacked/aborted. >From what I have seen in the debugger, it is probably transaction aborting at >those codes(3). Probably due to this change (4)(5) in Kafka 3.2.0, if any of the individual producer.send() fails, the transaction is aborted forcibly. Since this change (4) (5) is in Kafka producer, I realize that it would be difficult to achieve the behavior prior to 3.1.2, which allows "continue" to be used. However, if "continue" is not available, there is a concern that Kafka Streams applications will continue to get stuck because there is no way to ignore messages that raise RecordTooLargeException by sending them to the dead letter queue. (3) https://github.com/apache/kafka/blob/3.5.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1081-L1083 {code:java} if (transactionManager != null) { transactionManager.maybeTransitionToErrorState(e); } {code} (4) Silent data loss in Kafka producer https://issues.apache.org/jira/browse/KAFKA-9279 (5) KAFKA-9279: Fail producer transactions for asynchronously-reported, synchronously-encountered ApiExceptions #11508 https://github.com/apache/kafka/pull/11508 > Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once > > > Key: KAFKA-15259 > URL: https://issues.apache.org/jira/browse/KAFKA-15259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.5.1 >Reporter: Tomonari Yamashita >Priority: Major > Attachments: Reproducer.java, app_at_least_once.log, > app_exactly_once.log > > > [Problem] > - Kafka Streams does not continue processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE if using execute_once. > -- "CONTINUE will signal that Streams should ignore the issue and continue > processing"(1), so Kafka Streams should continue processing even if using > execute_once when ProductionExceptionHandlerResponse.CONTINUE used. > -- However, if using execute_once, Kafka Streams does not continue > processing due to rollback despite > ProductionExceptionHandlerResponse.CONTINUE. And the client will be shut down > as the default behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) > [Environment] > - Kafka Streams 3.5.1 > [Reproduction procedure] > # Create "input-topic" topic and "output-topic" > # Put several messages on "input-topic" > # Execute a simple Kafka streams program that transfers too large messages > from "input-topic" to "output-topic" with execute_once and returns > ProductionExceptionHandlerResponse.CONTINUE when an exception occurs in the > producer. Please refer to the reproducer program (attached file: > Reproducer.java). > # ==> However, Kafka Streams does not continue processing due to rollback > despite ProductionExceptionHandlerResponse.CONTINUE. And the stream thread > shutdown as the default > behavior(StreamThreadExceptionResponse.SHUTDOWN_CLIENT) (2). Please refer to > the debug log (attached file: app_exactly_once.log). > ## My excepted behavior is that Kafka Streams should continue processing > even if using execute_once. when ProductionExceptionHandlerResponse.CONTINUE > used. > [As far as my investigation] > - FYI, if using at_least_once instead of execute_once
[jira] [Updated] (KAFKA-15266) Log configs ignore static configs set for non primary synonyms
[ https://issues.apache.org/jira/browse/KAFKA-15266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aman Harish Gandhi updated KAFKA-15266: --- Summary: Log configs ignore static configs set for non primary synonyms (was: Log configs ignore static configs set non primary synonyms) > Log configs ignore static configs set for non primary synonyms > -- > > Key: KAFKA-15266 > URL: https://issues.apache.org/jira/browse/KAFKA-15266 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Aman Harish Gandhi >Assignee: Aman Harish Gandhi >Priority: Major > > In our server.properties we had the following config > {code:java} > log.retention.hours=48 > {code} > We noticed that after running alter configs to update broker level config(for > a config unrelated to retention) we were only deleting data after 7 days > instead of the configured 2. > The alterconfig we had ran was similar to this > {code:java} > sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config > "log.segment.bytes=50" > {code} > Digging deeper the issue could be pin pointed to the reconfigure block of > DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the > "primary" KafkaConfig synonym of the LogConfig and if it is not set then we > remove the value set in default log config as well. This eventually leads to > the retention.ms not being set in the default log config and that leads to > the default value of 7 days being used. The value set in > "log.retention.hours" is completely ignored in this case. > Pasting the relevant code block here > {code:java} > newConfig.valuesFromThisConfig.forEach { (k, v) => > if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { > DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => > if (v == null) > newBrokerDefaults.remove(configName) > else > newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) > } > } > } {code} > In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only > log.retention.ms. It does not contain the other synonyms like > `log.retention.minutes` or `log.retention.hours`. > This issue seems prevalent in all cases where there are more than 1 > KafkaConfig synonyms for the LogConfig. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15266) Log configs ignore static configs set non primary synonyms
Aman Harish Gandhi created KAFKA-15266: -- Summary: Log configs ignore static configs set non primary synonyms Key: KAFKA-15266 URL: https://issues.apache.org/jira/browse/KAFKA-15266 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.6.0 Reporter: Aman Harish Gandhi Assignee: Aman Harish Gandhi In our server.properties we had the following config {code:java} log.retention.hours=48 {code} We noticed that after running alter configs to update broker level config(for a config unrelated to retention) we were only deleting data after 7 days instead of the configured 2. The alterconfig we had ran was similar to this {code:java} sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config "log.segment.bytes=50" {code} Digging deeper the issue could be pin pointed to the reconfigure block of DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the "primary" KafkaConfig synonym of the LogConfig and if it is not set then we remove the value set in default log config as well. This eventually leads to the retention.ms not being set in the default log config and that leads to the default value of 7 days being used. The value set in "log.retention.hours" is completely ignored in this case. Pasting the relevant code block here {code:java} newConfig.valuesFromThisConfig.forEach { (k, v) => if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) { DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName => if (v == null) newBrokerDefaults.remove(configName) else newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef]) } } } {code} In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only log.retention.ms. It does not contain the other synonyms like `log.retention.minutes` or `log.retention.hours`. This issue seems prevalent in all cases where there are more than 1 KafkaConfig synonyms for the LogConfig. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14967) MockAdminClient throws NullPointerException in CreateTopicsResult
[ https://issues.apache.org/jira/browse/KAFKA-14967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-14967: -- Fix Version/s: 3.6.0 Assignee: James Shaw Resolution: Fixed > MockAdminClient throws NullPointerException in CreateTopicsResult > - > > Key: KAFKA-14967 > URL: https://issues.apache.org/jira/browse/KAFKA-14967 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.4.0 >Reporter: James Shaw >Assignee: James Shaw >Priority: Major > Fix For: 3.6.0 > > > Calling {{CreateTopicsResult.topicId().get()}} throws > {{{}NullPointerException{}}}, while {{KafkaAdminClient}} correctly returns > the topicId. > The NPE appears to be caused by [{{MockAdminClient.createTopics()}} calling > {{future.complete(null)}}|https://github.com/apache/kafka/blame/trunk/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java#L394] > Stacktrace: > {code:java} > java.util.concurrent.ExecutionException: java.lang.NullPointerException > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) > at > MockAdminClientBug.shouldNotThrowNullPointerException(MockAdminClientBug.java:37) >[snip] > Caused by: java.lang.NullPointerException > at > org.apache.kafka.common.internals.KafkaFutureImpl.lambda$thenApply$0(KafkaFutureImpl.java:60) > at > java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) > at > java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) > at > java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168) > at > org.apache.kafka.common.internals.KafkaFutureImpl.thenApply(KafkaFutureImpl.java:58) > at > org.apache.kafka.clients.admin.CreateTopicsResult.topicId(CreateTopicsResult.java:82) > ... 85 more > {code} > Test case to reproduce: > {code:java} > > import org.apache.kafka.clients.admin.Admin; > import org.apache.kafka.clients.admin.CreateTopicsResult; > import org.apache.kafka.clients.admin.MockAdminClient; > import org.apache.kafka.clients.admin.NewTopic; > import org.apache.kafka.common.Node; > import org.apache.kafka.common.Uuid; > import org.junit.jupiter.api.Test; > import java.util.Optional; > import java.util.concurrent.ExecutionException; > import static java.util.Collections.singletonList; > public class MockAdminClientBug { > @Test > void shouldNotThrowNullPointerException() throws ExecutionException, > InterruptedException { > Node controller = new Node(0, "mock", 0); > try (Admin admin = new MockAdminClient(singletonList(controller), > controller)) { > CreateTopicsResult result = admin.createTopics(singletonList(new > NewTopic("TestTopic", Optional.empty(), Optional.empty(; > Uuid topicId = result.topicId("TestTopic").get(); > System.out.println(topicId); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison merged pull request #13671: KAFKA-14967: fix NPE in MockAdminClient CreateTopicsResult
mimaison merged PR #13671: URL: https://github.com/apache/kafka/pull/13671 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #14101: Source task stop call was added to force stopping execution.
vamossagar12 commented on PR #14101: URL: https://github.com/apache/kafka/pull/14101#issuecomment-1655210284 > This is exactly our case: we have long source polling (in our case it's jdbc source task), each restarting doesn't break previous task (we get Graceful Shutdown failed), but creates new one => a lot of connections on db side. Yeah that's what I had imagined would be happening in your case. As I said you will need to play around with the graceful shutdown timeout and probably with the poll interval on the JDBC connector side (that would be out of scope here). IMO it might be hard to factor in the change in this PR considering that meddles with the current logic and we will need to make more changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #14117: MINOR: Code cleanups in group-coordinator module
dajac commented on PR #14117: URL: https://github.com/apache/kafka/pull/14117#issuecomment-1655195050 > can you point me to where the log context includes the topic partition info? Sure. It is right [here](https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java#L412). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] flashmouse commented on pull request #13920: KAFKA-15106 fix AbstractStickyAssignor isBalanced predict
flashmouse commented on PR #13920: URL: https://github.com/apache/kafka/pull/13920#issuecomment-1655170419 thx @ableegoldman I reduce partitions num and increase consumers num, I think all tests influenced by this pr should be PASSED now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] muralibasani commented on pull request #14106: KAFKA-14585: 1st part : Java versions for metadata/broker and updated LogConfig
muralibasani commented on PR #14106: URL: https://github.com/apache/kafka/pull/14106#issuecomment-1655168025 @fvaleri would like to take a look at this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org