Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]
ijuma commented on code in PR #14671: URL: https://github.com/apache/kafka/pull/14671#discussion_r1377053376 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -42,29 +42,27 @@ public class DirectoryId { public static final Uuid MIGRATING = new Uuid(0L, 2L); /** - * The set of reserved UUIDs that will never be returned by the random method. + * @return true if the given ID is reserved. An ID is reserved if it is one of the first 100, + * or if its string representation starts with a dash. ("-") */ -public static final Set RESERVED; - -static { -HashSet reserved = new HashSet<>(Uuid.RESERVED); -// The first 100 UUIDs are reserved for future use. -for (long i = 0L; i < 100L; i++) { -reserved.add(new Uuid(0L, i)); -} -RESERVED = Collections.unmodifiableSet(reserved); +public boolean isReserved(Uuid id) { +return id.toString().startsWith("-") || Review Comment: It's wasteful to generate the string to verify this - isn't there a cheaper way to do it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15584: Leader election with ELR [kafka]
CalvinConfluent commented on PR #14593: URL: https://github.com/apache/kafka/pull/14593#issuecomment-1786438703 Failing tests are irrelevant. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]
hudeqi commented on code in PR #14667: URL: https://github.com/apache/kafka/pull/14667#discussion_r1377023677 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig, // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +// `tieredEnabledPartitions` has exclude internal topics +val tieredEnabledPartitions = partitionsToStop.filter(sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => unifiedLog.remoteLogEnabled())) Review Comment: The calculation of `tieredEnabledPartitions` may not be placed after if{}, because `logManager.asyncDelete` will remove the corresponding unifiedLog from `currentLogs` in `logManager`, causing the calculation of `tieredEnabledPartitions` to be incorrect. But I have merged the two filter conditions. 1. The calculation of `tieredEnabledPartitions` actually has excluded internal topics, because https://github.com/apache/kafka/blob/8f8ad6db384ce30e8a5d848d3ed826a3f7a54dfe/core/src/main/scala/kafka/log/UnifiedLog.scala#L1887 2. added. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15686) Consumer should be able to detect network problem
[ https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781206#comment-17781206 ] Philip Nee commented on KAFKA-15686: [~ihavenoem...@163.com] - What is the use case of knowing which broker is down? If telemetry is set up for your kafka cluster, you should be able to tell by looking at the broker side metrics. It would be helpful to articulate a clear case to help community to understand the rationale behind the ask. Thanks. > Consumer should be able to detect network problem > - > > Key: KAFKA-15686 > URL: https://issues.apache.org/jira/browse/KAFKA-15686 > Project: Kafka > Issue Type: New Feature > Components: consumer >Affects Versions: 3.5.0 >Reporter: Jiahongchao >Priority: Minor > > When we call poll method in consumer, it will return normally even if some > partitions do not have a leader. > What should we do to detect such failures? Currently we have to check log to > find out broker connection problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15686) Consumer should be able to detect network problem
[ https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781206#comment-17781206 ] Philip Nee edited comment on KAFKA-15686 at 10/31/23 4:03 AM: -- [~ihavenoem...@163.com] - What is the use case of knowing which broker is down from client's prospective? If telemetry is set up for your kafka cluster, you should be able to tell by looking at the broker side metrics. It would be helpful to articulate a clear case to help community to understand the rationale behind the ask. Thanks. was (Author: JIRAUSER283568): [~ihavenoem...@163.com] - What is the use case of knowing which broker is down? If telemetry is set up for your kafka cluster, you should be able to tell by looking at the broker side metrics. It would be helpful to articulate a clear case to help community to understand the rationale behind the ask. Thanks. > Consumer should be able to detect network problem > - > > Key: KAFKA-15686 > URL: https://issues.apache.org/jira/browse/KAFKA-15686 > Project: Kafka > Issue Type: New Feature > Components: consumer >Affects Versions: 3.5.0 >Reporter: Jiahongchao >Priority: Minor > > When we call poll method in consumer, it will return normally even if some > partitions do not have a leader. > What should we do to detect such failures? Currently we have to check log to > find out broker connection problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]
hudeqi commented on code in PR #14667: URL: https://github.com/apache/kafka/pull/14667#discussion_r1377023677 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig, // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +// `tieredEnabledPartitions` has exclude internal topics +val tieredEnabledPartitions = partitionsToStop.filter(sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => unifiedLog.remoteLogEnabled())) Review Comment: The calculation of `tieredEnabledPartitions` may not be placed after if{}, because `logManager.asyncDelete` will remove the corresponding unifiedLog from `currentLogs` in `logManager`, causing the calculation of `tieredEnabledPartitions` to be incorrect. 1. The calculation of `tieredEnabledPartitions` actually has excluded internal topics, because https://github.com/apache/kafka/blob/8f8ad6db384ce30e8a5d848d3ed826a3f7a54dfe/core/src/main/scala/kafka/log/UnifiedLog.scala#L1887 2. added. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]
hudeqi commented on code in PR #14667: URL: https://github.com/apache/kafka/pull/14667#discussion_r1377023677 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig, // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +// `tieredEnabledPartitions` has exclude internal topics +val tieredEnabledPartitions = partitionsToStop.filter(sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => unifiedLog.remoteLogEnabled())) Review Comment: The calculation of `tieredEnabledPartitions` may not be placed after if{}, because `logManager.asyncDelete` will remove the corresponding unifiedLog from `currentLogs` in `logManager`, causing the calculation of `tieredEnabledPartitions` to be incorrect. 1. The calculation of `tieredEnabledPartitions` actually has excluded internal topics, because (here)[https://github.com/apache/kafka/blob/8f8ad6db384ce30e8a5d848d3ed826a3f7a54dfe/core/src/main/scala/kafka/log/UnifiedLog.scala#L1887] 2. added. @divijvaidya -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]
hudeqi commented on code in PR #14667: URL: https://github.com/apache/kafka/pull/14667#discussion_r1377020873 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig, // Third delete the logs and checkpoint. val errorMap = new mutable.HashMap[TopicPartition, Throwable]() +// `tieredEnabledPartitions` has exclude internal topics +val tieredEnabledPartitions = partitionsToStop.filter(sp => logManager.getLog(sp.topicPartition).exists(unifiedLog => unifiedLog.remoteLogEnabled())) if (partitionsToDelete.nonEmpty) { // Delete the logs and checkpoint. logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => errorMap.put(tp, e)) } remoteLogManager.foreach { rlm => // exclude the partitions with offline/error state Review Comment: updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue commented on PR #14672: URL: https://github.com/apache/kafka/pull/14672#issuecomment-1786396441 @philipnee Can you add the `ctr` tag, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove ambiguous constructor [kafka]
philipnee commented on PR #14598: URL: https://github.com/apache/kafka/pull/14598#issuecomment-1786392041 Hi @kirktrue @cadonna - Thank you for taking the time to review the PR. I don't have a very strong opinion about the constructor, but I do want other refactor changes so let me know if there are any questions/concerns on those! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1786388893 Hi @dajac - Thank you for the comments. I addressed them in the latest commits. Would you mind reviewing the changes again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode [kafka]
github-actions[bot] commented on PR #13530: URL: https://github.com/apache/kafka/pull/13530#issuecomment-1786384406 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
philipnee commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1377011485 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + +private final String name; +private final Map tags; + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the telemetry metric name of the metric (the final name + * under which this metric is emitted). + */ +public MetricKey(String name) { +this(name, null); +} + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ +public MetricKey(String name, Map tags) { +this.name = Objects.requireNonNull(name); +this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); +} + +public MetricKey(MetricName metricName) { +this(metricName.name(), metricName.tags()); +} + +@Override +public MetricKey key() { +return this; +} + +public String getName() { Review Comment: @mjsax - I mentioned in our team meeting today. A clear style guide seems missing because different commiters seem to do things differently. I do want to take a crack at it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]
kirktrue opened a new pull request, #14672: URL: https://github.com/apache/kafka/pull/14672 This implements the `Consumer.groupMetadata()` API by means of an event passed to and fulfilled in the consumer network I/O thread. The application thread will block until this event is processed in the background thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]
jolshan commented on code in PR #14671: URL: https://github.com/apache/kafka/pull/14671#discussion_r1376940646 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -42,29 +42,27 @@ public class DirectoryId { public static final Uuid MIGRATING = new Uuid(0L, 2L); /** - * The set of reserved UUIDs that will never be returned by the random method. + * @return true if the given ID is reserved. An ID is reserved if it is one of the first 100, + * or if its string representation starts with a dash. ("-") */ -public static final Set RESERVED; - -static { -HashSet reserved = new HashSet<>(Uuid.RESERVED); -// The first 100 UUIDs are reserved for future use. -for (long i = 0L; i < 100L; i++) { -reserved.add(new Uuid(0L, i)); -} -RESERVED = Collections.unmodifiableSet(reserved); +public boolean isReserved(Uuid id) { +return id.toString().startsWith("-") || +(uuid.getMostSignificantBits() == 0 && +uuid.getLeastSignificantBits() < 100); } /** * Static factory to generate a directory ID. * - * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") + * This will not generate a reserved UUID (first 100), or one whose string representation + * starts with a dash ("-") */ public static Uuid random() { -Uuid uuid = Uuid.randomUuid(); -while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { -uuid = Uuid.randomUuid(); +while (true) { +Uuid uuid = Uuid.randomUuid(); Review Comment: Doesn't the Uuid class also ensure this? ``` public static Uuid randomUuid() { Uuid uuid = unsafeRandomUuid(); while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { uuid = unsafeRandomUuid(); } return uuid; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]
jolshan commented on code in PR #14671: URL: https://github.com/apache/kafka/pull/14671#discussion_r1376940840 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -42,29 +42,27 @@ public class DirectoryId { public static final Uuid MIGRATING = new Uuid(0L, 2L); /** - * The set of reserved UUIDs that will never be returned by the random method. + * @return true if the given ID is reserved. An ID is reserved if it is one of the first 100, + * or if its string representation starts with a dash. ("-") */ -public static final Set RESERVED; - -static { -HashSet reserved = new HashSet<>(Uuid.RESERVED); -// The first 100 UUIDs are reserved for future use. -for (long i = 0L; i < 100L; i++) { -reserved.add(new Uuid(0L, i)); -} -RESERVED = Collections.unmodifiableSet(reserved); +public boolean isReserved(Uuid id) { +return id.toString().startsWith("-") || +(uuid.getMostSignificantBits() == 0 && +uuid.getLeastSignificantBits() < 100); } /** * Static factory to generate a directory ID. * - * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") + * This will not generate a reserved UUID (first 100), or one whose string representation + * starts with a dash ("-") */ public static Uuid random() { -Uuid uuid = Uuid.randomUuid(); -while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { -uuid = Uuid.randomUuid(); +while (true) { +Uuid uuid = Uuid.randomUuid(); +if (isReserved(uuid)) { Review Comment: shouldn't this be not reserved? ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -42,29 +42,27 @@ public class DirectoryId { public static final Uuid MIGRATING = new Uuid(0L, 2L); /** - * The set of reserved UUIDs that will never be returned by the random method. + * @return true if the given ID is reserved. An ID is reserved if it is one of the first 100, + * or if its string representation starts with a dash. ("-") */ -public static final Set RESERVED; - -static { -HashSet reserved = new HashSet<>(Uuid.RESERVED); -// The first 100 UUIDs are reserved for future use. -for (long i = 0L; i < 100L; i++) { -reserved.add(new Uuid(0L, i)); -} -RESERVED = Collections.unmodifiableSet(reserved); +public boolean isReserved(Uuid id) { +return id.toString().startsWith("-") || +(uuid.getMostSignificantBits() == 0 && +uuid.getLeastSignificantBits() < 100); } /** * Static factory to generate a directory ID. * - * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") + * This will not generate a reserved UUID (first 100), or one whose string representation + * starts with a dash ("-") */ public static Uuid random() { -Uuid uuid = Uuid.randomUuid(); -while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { -uuid = Uuid.randomUuid(); +while (true) { +Uuid uuid = Uuid.randomUuid(); +if (isReserved(uuid)) { Review Comment: shouldn't this be not reserved? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]
jolshan commented on code in PR #14671: URL: https://github.com/apache/kafka/pull/14671#discussion_r1376940646 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -42,29 +42,27 @@ public class DirectoryId { public static final Uuid MIGRATING = new Uuid(0L, 2L); /** - * The set of reserved UUIDs that will never be returned by the random method. + * @return true if the given ID is reserved. An ID is reserved if it is one of the first 100, + * or if its string representation starts with a dash. ("-") */ -public static final Set RESERVED; - -static { -HashSet reserved = new HashSet<>(Uuid.RESERVED); -// The first 100 UUIDs are reserved for future use. -for (long i = 0L; i < 100L; i++) { -reserved.add(new Uuid(0L, i)); -} -RESERVED = Collections.unmodifiableSet(reserved); +public boolean isReserved(Uuid id) { +return id.toString().startsWith("-") || +(uuid.getMostSignificantBits() == 0 && +uuid.getLeastSignificantBits() < 100); } /** * Static factory to generate a directory ID. * - * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") + * This will not generate a reserved UUID (first 100), or one whose string representation + * starts with a dash ("-") */ public static Uuid random() { -Uuid uuid = Uuid.randomUuid(); -while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { -uuid = Uuid.randomUuid(); +while (true) { +Uuid uuid = Uuid.randomUuid(); Review Comment: Doesn't the Uuid class also ensure this? ``` public static Uuid randomUuid() { Uuid uuid = unsafeRandomUuid(); while (uuid.equals(METADATA_TOPIC_ID) || uuid.equals(ZERO_UUID) || uuid.toString().startsWith("-")) { uuid = unsafeRandomUuid(); } return uuid; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15281) Implement the groupMetadata Consumer API
[ https://issues.apache.org/jira/browse/KAFKA-15281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15281: - Assignee: Kirk True > Implement the groupMetadata Consumer API > > > Key: KAFKA-15281 > URL: https://issues.apache.org/jira/browse/KAFKA-15281 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-preview > Original Estimate: 504h > Remaining Estimate: 504h > > The threading refactor project needs to implement the {{groupMetadata()}} API > call once support for the KIP-848 protocol is implemented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue commented on code in PR #14670: URL: https://github.com/apache/kafka/pull/14670#discussion_r1376925327 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig { */ static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; -/** - * internal.throw.on.fetch.stable.offset.unsupported - * Whether or not the consumer should throw when the new stable offset feature is supported. - * If set to true then the client shall crash upon hitting it. - * The purpose of this flag is to prevent unexpected broker downgrade which makes - * the offset fetch protection against pending commit invalid. The safest approach - * is to fail fast to avoid introducing correctness issue. - * - * - * Note: this is an internal configuration and could be changed in the future in a backward incompatible way - * - */ -static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = "internal.throw.on.fetch.stable.offset.unsupported"; - Review Comment: I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and `AsyncKafkaConsumer` couldn't access it via package-level visibility since they're in the `internals` sub-package. @dajac—will this be considered a breaking change? I assumed not because `THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time, // no coordinator will be constructed for the default (null) group id if (!groupId.isPresent()) { config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); - //config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); + config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED); Review Comment: Moving the package-level variable and method from `ConsumerConfig` also fixes this issue. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer timer) { } } -// This is here temporary as we don't have public access to the ConsumerConfig in this module. -public static Map appendDeserializerToConfig(Map configs, - Deserializer keyDeserializer, - Deserializer valueDeserializer) { -// validate deserializer configuration, if the passed deserializer instance is null, the user must explicitly set a valid deserializer configuration value -Map newConfigs = new HashMap<>(configs); -if (keyDeserializer != null) -newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass()); -else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null) -throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); -if (valueDeserializer != null) -newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass()); -else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null) -throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, "must be non-null."); -return newConfigs; -} - Review Comment: The original version of this method as it appeared in `ConsumerConfig` was moved to `ConsumerUtils` so now it can be used from here too ## tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java: ## @@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws Throwable { () -> log.info("offsetsForTime = {}", offsetsForTime.result)); // Whether or not offsetsForTimes works, beginningOffsets and endOffsets // should work. -consumer.beginningOffsets(timestampsToSearch.keySet()); -consumer.endOffsets(timestampsToSearch.keySet()); +Map beginningOffsets = consumer.beginningOffsets(timestampsToSearch.keySet()); +Map endingOffsets = consumer.endOffsets(timestampsToSearch.keySet()); +log.trace("beginningOffsets: {}, endingOffsets: {}", beginningOffsets, endingOffsets); Review Comment: This is super annoying. I started getting errors from SpotBugs because the offsets methods were called but the return value was being ignored. This is a very brute force way of silencing the checker. I could not find a clean way to ignore the warning. I also don't know why this is suddenly being caught by SpotBugs. From its perspective, nothing has changed in the `KafkaConsumer` API, right? ##
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
mjsax commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1376921687 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Value object that contains the name and tags for a Metric. + */ +public class MetricKey implements MetricKeyable { + +private final String name; +private final Map tags; + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the telemetry metric name of the metric (the final name + * under which this metric is emitted). + */ +public MetricKey(String name) { +this(name, null); +} + +/** + * Create a {@code MetricKey} + * + * @param name metric name. This should be the .converted. name of the metric (the final name + * under which this metric is emitted). + * @param tags mapping of tag keys to values. + */ +public MetricKey(String name, Map tags) { +this.name = Objects.requireNonNull(name); +this.tags = tags != null ? Collections.unmodifiableMap(tags) : Collections.emptyMap(); +} + +public MetricKey(MetricName metricName) { +this(metricName.name(), metricName.tags()); +} + +@Override +public MetricKey key() { +return this; +} + +public String getName() { Review Comment: There is no checkstyle rule that would fail the build -- its basically up to everybody (especially committer) to keep an eye out for it. The guidelines (https://kafka.apache.org/coding-guide) are not super clear about it unfortunately (too Scala centric), but I recently double check with a few other committer and there was agreement that not using `get` is the rule) > Avoid getters and setters - stick to plain vals or vars instead. If (later on) you require a custom setter (or getter) for a var named myVar then add a shadow var myVar_underlying and override the setter (def myVar_=) and the getter (def myVar = myVar_underlying). Wanna help with a PR to make it more explicit on the guidelines docs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: small optimization for DirectoryId.random [kafka]
cmccabe opened a new pull request, #14671: URL: https://github.com/apache/kafka/pull/14671 DirectoryId.random doesn't need to instantiate the first 100 IDs to check if an ID is one of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]
junrao merged PR #14603: URL: https://github.com/apache/kafka/pull/14603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781155#comment-17781155 ] Colin McCabe commented on KAFKA-15754: -- {quote} I was wondering if there is any good reason for using a Base64 URL encoder and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet not containing the "-". {quote} At one point, I did raise the question of why dash was used to serialize Kafka Uuids. But by the time I did so we were already using it in a few places so the question was not relevant. We're not going to change Uuid serialization now. I think the general rationale was that dash and underscore were friendlier than slash and plus sign. But that's debatable, of course. Slash, at least, is not filesystem-safe. > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781152#comment-17781152 ] Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:21 PM: - You can run this code yourself if you are curious. Here it is. You will need bash 4 or better. (my version is {{GNU bash, version 5.2.15(1)-release (aarch64-apple-darwin21.6.0)}}) {code} #!/usr/bin/env bash declare -A IDS_PER_INITIAL_LETTER for ((i = 0; i < 1 ; i++)); do ./kafka-storage.sh random-uuid > /tmp/out 2> /dev/null FIRST_LETTER=$(head -c 1 /tmp/out) IDS_PER_INITIAL_LETTER[$FIRST_LETTER]=$((IDS_PER_INITIAL_LETTER[$FIRST_LETTER]+1)) done for k in "${!IDS_PER_INITIAL_LETTER[@]}"; do echo "IDs starting with $k : ${IDS_PER_INITIAL_LETTER[$k]}" done {code} was (Author: cmccabe): You can run this code yourself if you are curious. Here it is. You will need bash 4 or better. (my version is `GNU bash, version 5.2.15(1)-release (aarch64-apple-darwin21.6.0)`) {code} #!/usr/bin/env bash declare -A IDS_PER_INITIAL_LETTER for ((i = 0; i < 1 ; i++)); do ./kafka-storage.sh random-uuid > /tmp/out 2> /dev/null FIRST_LETTER=$(head -c 1 /tmp/out) IDS_PER_INITIAL_LETTER[$FIRST_LETTER]=$((IDS_PER_INITIAL_LETTER[$FIRST_LETTER]+1)) done for k in "${!IDS_PER_INITIAL_LETTER[@]}"; do echo "IDs starting with $k : ${IDS_PER_INITIAL_LETTER[$k]}" done {code} > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781153#comment-17781153 ] Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:20 PM: - I am closing this JIRA because {{kafka-storage.sh random-uuid}} can not, in fact, generate uuids starting with {{-}}. You can see this via analysis of the code or by just running it as I did was (Author: cmccabe): I am closing this JIRA because {{kafka-storage.sh random-uuid}} can not, in fact, generate uuids starting with '-' > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781153#comment-17781153 ] Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:20 PM: - I am closing this JIRA because {{kafka-storage.sh random-uuid}} can not, in fact, generate uuids starting with '-' was (Author: cmccabe): I am closing this JIRA because `kafka-storage.sh` can not, in fact, generate uuids starting with '-' > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781151#comment-17781151 ] Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:20 PM: - I ran {{kafka-storage.sh random-uuid}} 10,000 times and got the following distribution of first characters: {code} IDs starting with 0 : 166 IDs starting with 1 : 174 IDs starting with 2 : 135 IDs starting with 3 : 172 IDs starting with 4 : 155 IDs starting with 5 : 154 IDs starting with 6 : 152 IDs starting with 7 : 172 IDs starting with 8 : 170 IDs starting with 9 : 166 IDs starting with A : 147 IDs starting with B : 161 IDs starting with C : 172 IDs starting with D : 158 IDs starting with E : 164 IDs starting with F : 164 IDs starting with G : 146 IDs starting with H : 156 IDs starting with I : 166 IDs starting with J : 172 IDs starting with K : 177 IDs starting with L : 143 IDs starting with M : 171 IDs starting with N : 144 IDs starting with O : 157 IDs starting with P : 162 IDs starting with Q : 144 IDs starting with R : 157 IDs starting with S : 161 IDs starting with T : 158 IDs starting with U : 174 IDs starting with V : 166 IDs starting with W : 166 IDs starting with X : 159 IDs starting with Y : 165 IDs starting with Z : 161 IDs starting with _ : 159 IDs starting with a : 145 IDs starting with b : 169 IDs starting with c : 166 IDs starting with d : 171 IDs starting with e : 162 IDs starting with f : 154 IDs starting with g : 132 IDs starting with h : 152 IDs starting with i : 136 IDs starting with j : 166 IDs starting with k : 159 IDs starting with l : 156 IDs starting with m : 154 IDs starting with n : 155 IDs starting with o : 154 IDs starting with p : 158 IDs starting with q : 141 IDs starting with r : 165 IDs starting with s : 154 IDs starting with t : 162 IDs starting with u : 146 IDs starting with v : 161 IDs starting with w : 164 IDs starting with x : 154 IDs starting with y : 164 IDs starting with z : 154 {code} No IDs were generated with a first character of {{-}}, as expected. was (Author: cmccabe): I ran {kafka-storage.sh random-uuid} 10,000 times and got the following distribution of first characters: {code} IDs starting with 0 : 166 IDs starting with 1 : 174 IDs starting with 2 : 135 IDs starting with 3 : 172 IDs starting with 4 : 155 IDs starting with 5 : 154 IDs starting with 6 : 152 IDs starting with 7 : 172 IDs starting with 8 : 170 IDs starting with 9 : 166 IDs starting with A : 147 IDs starting with B : 161 IDs starting with C : 172 IDs starting with D : 158 IDs starting with E : 164 IDs starting with F : 164 IDs starting with G : 146 IDs starting with H : 156 IDs starting with I : 166 IDs starting with J : 172 IDs starting with K : 177 IDs starting with L : 143 IDs starting with M : 171 IDs starting with N : 144 IDs starting with O : 157 IDs starting with P : 162 IDs starting with Q : 144 IDs starting with R : 157 IDs starting with S : 161 IDs starting with T : 158 IDs starting with U : 174 IDs starting with V : 166 IDs starting with W : 166 IDs starting with X : 159 IDs starting with Y : 165 IDs starting with Z : 161 IDs starting with _ : 159 IDs starting with a : 145 IDs starting with b : 169 IDs starting with c : 166 IDs starting with d : 171 IDs starting with e : 162 IDs starting with f : 154 IDs starting with g : 132 IDs starting with h : 152 IDs starting with i : 136 IDs starting with j : 166 IDs starting with k : 159 IDs starting with l : 156 IDs starting with m : 154 IDs starting with n : 155 IDs starting with o : 154 IDs starting with p : 158 IDs starting with q : 141 IDs starting with r : 165 IDs starting with s : 154 IDs starting with t : 162 IDs starting with u : 146 IDs starting with v : 161 IDs starting with w : 164 IDs starting with x : 154 IDs starting with y : 164 IDs starting with z : 154 {code} No IDs were generated with a first character of {-}, as expected. > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it >
[jira] [Resolved] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-15754. -- Resolution: Invalid kafka-storage tool can not, in fact, generate uuids starting with '-' > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781153#comment-17781153 ] Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:19 PM: - I am closing this JIRA because `kafka-storage.sh` can not, in fact, generate uuids starting with '-' was (Author: cmccabe): kafka-storage tool can not, in fact, generate uuids starting with '-' > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781151#comment-17781151 ] Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:19 PM: - I ran {kafka-storage.sh random-uuid} 10,000 times and got the following distribution of first characters: {code} IDs starting with 0 : 166 IDs starting with 1 : 174 IDs starting with 2 : 135 IDs starting with 3 : 172 IDs starting with 4 : 155 IDs starting with 5 : 154 IDs starting with 6 : 152 IDs starting with 7 : 172 IDs starting with 8 : 170 IDs starting with 9 : 166 IDs starting with A : 147 IDs starting with B : 161 IDs starting with C : 172 IDs starting with D : 158 IDs starting with E : 164 IDs starting with F : 164 IDs starting with G : 146 IDs starting with H : 156 IDs starting with I : 166 IDs starting with J : 172 IDs starting with K : 177 IDs starting with L : 143 IDs starting with M : 171 IDs starting with N : 144 IDs starting with O : 157 IDs starting with P : 162 IDs starting with Q : 144 IDs starting with R : 157 IDs starting with S : 161 IDs starting with T : 158 IDs starting with U : 174 IDs starting with V : 166 IDs starting with W : 166 IDs starting with X : 159 IDs starting with Y : 165 IDs starting with Z : 161 IDs starting with _ : 159 IDs starting with a : 145 IDs starting with b : 169 IDs starting with c : 166 IDs starting with d : 171 IDs starting with e : 162 IDs starting with f : 154 IDs starting with g : 132 IDs starting with h : 152 IDs starting with i : 136 IDs starting with j : 166 IDs starting with k : 159 IDs starting with l : 156 IDs starting with m : 154 IDs starting with n : 155 IDs starting with o : 154 IDs starting with p : 158 IDs starting with q : 141 IDs starting with r : 165 IDs starting with s : 154 IDs starting with t : 162 IDs starting with u : 146 IDs starting with v : 161 IDs starting with w : 164 IDs starting with x : 154 IDs starting with y : 164 IDs starting with z : 154 {code} No IDs were generated with a first character of {-}, as expected. was (Author: cmccabe): I ran `kafka-storage.sh random-uuid` 10,000 times and got the following distribution of first characters: {code} IDs starting with 0 : 166 IDs starting with 1 : 174 IDs starting with 2 : 135 IDs starting with 3 : 172 IDs starting with 4 : 155 IDs starting with 5 : 154 IDs starting with 6 : 152 IDs starting with 7 : 172 IDs starting with 8 : 170 IDs starting with 9 : 166 IDs starting with A : 147 IDs starting with B : 161 IDs starting with C : 172 IDs starting with D : 158 IDs starting with E : 164 IDs starting with F : 164 IDs starting with G : 146 IDs starting with H : 156 IDs starting with I : 166 IDs starting with J : 172 IDs starting with K : 177 IDs starting with L : 143 IDs starting with M : 171 IDs starting with N : 144 IDs starting with O : 157 IDs starting with P : 162 IDs starting with Q : 144 IDs starting with R : 157 IDs starting with S : 161 IDs starting with T : 158 IDs starting with U : 174 IDs starting with V : 166 IDs starting with W : 166 IDs starting with X : 159 IDs starting with Y : 165 IDs starting with Z : 161 IDs starting with _ : 159 IDs starting with a : 145 IDs starting with b : 169 IDs starting with c : 166 IDs starting with d : 171 IDs starting with e : 162 IDs starting with f : 154 IDs starting with g : 132 IDs starting with h : 152 IDs starting with i : 136 IDs starting with j : 166 IDs starting with k : 159 IDs starting with l : 156 IDs starting with m : 154 IDs starting with n : 155 IDs starting with o : 154 IDs starting with p : 158 IDs starting with q : 141 IDs starting with r : 165 IDs starting with s : 154 IDs starting with t : 162 IDs starting with u : 146 IDs starting with v : 161 IDs starting with w : 164 IDs starting with x : 154 IDs starting with y : 164 IDs starting with z : 154 {code} No IDs were generated with a first character of `-`, as expected. > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it >
[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781152#comment-17781152 ] Colin McCabe commented on KAFKA-15754: -- You can run this code yourself if you are curious. Here it is. You will need bash 4 or better. (my version is `GNU bash, version 5.2.15(1)-release (aarch64-apple-darwin21.6.0)`) {code} #!/usr/bin/env bash declare -A IDS_PER_INITIAL_LETTER for ((i = 0; i < 1 ; i++)); do ./kafka-storage.sh random-uuid > /tmp/out 2> /dev/null FIRST_LETTER=$(head -c 1 /tmp/out) IDS_PER_INITIAL_LETTER[$FIRST_LETTER]=$((IDS_PER_INITIAL_LETTER[$FIRST_LETTER]+1)) done for k in "${!IDS_PER_INITIAL_LETTER[@]}"; do echo "IDs starting with $k : ${IDS_PER_INITIAL_LETTER[$k]}" done {code} > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]
CalvinConfluent commented on PR #14603: URL: https://github.com/apache/kafka/pull/14603#issuecomment-1786194305 https://issues.apache.org/jira/browse/KAFKA-15759 DescribeClusterRequestTest https://issues.apache.org/jira/browse/KAFKA-15760 CoordinatorTest https://issues.apache.org/jira/browse/KAFKA-15761 ConnectorRestartApiIntegrationTest https://issues.apache.org/jira/browse/KAFKA-15762 ClusterConnectionStatesTest Added a few more common failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15762) ClusterConnectionStatesTest.testSingleIP is flaky
Calvin Liu created KAFKA-15762: -- Summary: ClusterConnectionStatesTest.testSingleIP is flaky Key: KAFKA-15762 URL: https://issues.apache.org/jira/browse/KAFKA-15762 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu Build / JDK 11 and Scala 2.13 / testSingleIP() – org.apache.kafka.clients.ClusterConnectionStatesTest {code:java} org.opentest4j.AssertionFailedError: expected: <1> but was: <2> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:527) at app//org.apache.kafka.clients.ClusterConnectionStatesTest.testSingleIP(ClusterConnectionStatesTest.java:267) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781151#comment-17781151 ] Colin McCabe commented on KAFKA-15754: -- I ran `kafka-storage.sh random-uuid` 10,000 times and got the following distribution of first characters: {code} IDs starting with 0 : 166 IDs starting with 1 : 174 IDs starting with 2 : 135 IDs starting with 3 : 172 IDs starting with 4 : 155 IDs starting with 5 : 154 IDs starting with 6 : 152 IDs starting with 7 : 172 IDs starting with 8 : 170 IDs starting with 9 : 166 IDs starting with A : 147 IDs starting with B : 161 IDs starting with C : 172 IDs starting with D : 158 IDs starting with E : 164 IDs starting with F : 164 IDs starting with G : 146 IDs starting with H : 156 IDs starting with I : 166 IDs starting with J : 172 IDs starting with K : 177 IDs starting with L : 143 IDs starting with M : 171 IDs starting with N : 144 IDs starting with O : 157 IDs starting with P : 162 IDs starting with Q : 144 IDs starting with R : 157 IDs starting with S : 161 IDs starting with T : 158 IDs starting with U : 174 IDs starting with V : 166 IDs starting with W : 166 IDs starting with X : 159 IDs starting with Y : 165 IDs starting with Z : 161 IDs starting with _ : 159 IDs starting with a : 145 IDs starting with b : 169 IDs starting with c : 166 IDs starting with d : 171 IDs starting with e : 162 IDs starting with f : 154 IDs starting with g : 132 IDs starting with h : 152 IDs starting with i : 136 IDs starting with j : 166 IDs starting with k : 159 IDs starting with l : 156 IDs starting with m : 154 IDs starting with n : 155 IDs starting with o : 154 IDs starting with p : 158 IDs starting with q : 141 IDs starting with r : 165 IDs starting with s : 154 IDs starting with t : 162 IDs starting with u : 146 IDs starting with v : 161 IDs starting with w : 164 IDs starting with x : 154 IDs starting with y : 164 IDs starting with z : 154 {code} No IDs were generated with a first character of `-`, as expected. > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15761) ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky
Calvin Liu created KAFKA-15761: -- Summary: ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky Key: KAFKA-15761 URL: https://issues.apache.org/jira/browse/KAFKA-15761 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu Build / JDK 21 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest {code:java} java.lang.AssertionError: Failed to stop connector and tasks within 12ms at org.junit.Assert.fail(Assert.java:89)at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.runningConnectorAndTasksRestart(ConnectorRestartApiIntegrationTest.java:273) at org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector(ConnectorRestartApiIntegrationTest.java:231) at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) at java.base/java.lang.reflect.Method.invoke(Method.java:580) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15760) org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky
Calvin Liu created KAFKA-15760: -- Summary: org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky Key: KAFKA-15760 URL: https://issues.apache.org/jira/browse/KAFKA-15760 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest {code:java} java.util.concurrent.TimeoutException: testTaskRequestWithOldStartMsGetsUpdated() timed out after 12 milliseconds at org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) at org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15759) DescribeClusterRequestTest is flaky
Calvin Liu created KAFKA-15759: -- Summary: DescribeClusterRequestTest is flaky Key: KAFKA-15759 URL: https://issues.apache.org/jira/browse/KAFKA-15759 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Calvin Liu testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft – kafka.server.DescribeClusterRequestTest {code:java} org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) at kafka.server.DescribeClusterRequestTest.$anonfun$testDescribeClusterRequest$4(DescribeClusterRequestTest.scala:99) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at kafka.server.DescribeClusterRequestTest.testDescribeClusterRequest(DescribeClusterRequestTest.scala:86) at kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(DescribeClusterRequestTest.scala:53) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on PR #14621: URL: https://github.com/apache/kafka/pull/14621#issuecomment-1786178001 > > @apoorvmittal10 : Yes, we are gradually moving the codebase to java. Ideally, all new classes should be written in java. > > Thanks @junrao, I ll update the PR by tomorrow with new classes in Java. @junrao @hachikuji I have moved new classes to java, Can I please get a review on the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1376884298 ## core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala: ## @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics + +import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._ +import kafka.network.RequestChannel +import org.apache.kafka.common.errors.InvalidConfigurationException + +import java.util.regex.{Pattern, PatternSyntaxException} +import scala.collection.mutable + +/** + * Information from the client's metadata is gathered from the client's request. + */ +object ClientMetricsMetadata { + def apply(request: RequestChannel.Request, clientInstanceId: String): ClientMetricsMetadata = { +val instance = new ClientMetricsMetadata +val ctx = request.context +val clientSoftwareName = if (ctx.clientInformation != null) ctx.clientInformation.softwareName() else "" +val clientSoftwareVersion = if (ctx.clientInformation != null) ctx.clientInformation.softwareVersion() else "" +instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, clientSoftwareVersion, + ctx.clientAddress.getHostAddress, ctx.clientAddress.getHostAddress) // TODO: Fix Port +instance + } + + def apply(clientInstanceId: String, clientId: String, clientSoftwareName: String, +clientSoftwareVersion: String, clientSourceAddress: String, clientSourcePort: String): ClientMetricsMetadata = { +val instance = new ClientMetricsMetadata +instance.init(clientInstanceId, clientId, clientSoftwareName, clientSoftwareVersion, clientSourceAddress, clientSourcePort) +instance + } + + /** + * Parses the client matching patterns and builds a map with entries that has + * (PatternName, PatternValue) as the entries. + * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) + * + * NOTES: + * 1. Client match pattern splits the input into two parts separated by first + * occurrence of the character '=' + * 2. '*' is considered as invalid client match pattern + * @param patterns List of client matching pattern strings + * @return map of client matching pattern entries + */ + def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = { +val patternsMap = mutable.Map[String, String]() +if (patterns != null) { + patterns.foreach(x => { +val nameValuePair = x.split("=", 2).map(x => x.trim) +if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && validRegExPattern(nameValuePair(1))) { + patternsMap += (nameValuePair(0) -> nameValuePair(1)) +} else { + throw new InvalidConfigurationException("Illegal client matching pattern: " + x) +} + }) +} +patternsMap.toMap + } + + private def validRegExPattern(inputPattern :String): Boolean = { +try { + Pattern.compile(inputPattern) + true +} catch { + case _: PatternSyntaxException => +false +} + } + +} + +class ClientMetricsMetadata { + var attributesMap: mutable.Map[String, String] = scala.collection.mutable.Map[String, String]() Review Comment: Moved to java implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1376884030 ## core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics + +import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM +import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST} +import org.apache.kafka.common.errors.InvalidRequestException + +import java.util +import java.util.Properties + +/** + * Client metric configuration related parameters and the supporting methods like validation and update methods + * are defined in this class. + * + * SubscriptionInfo: Contains the client metric subscription information. Supported operations from the CLI are + * add/delete/update operations. Every subscription object contains the following parameters that are populated + * during the creation of the subscription. + * + * { + * + * subscriptionId: Name/ID supplied by CLI during the creation of the client metric subscription. + * subscribedMetrics: List of metric prefixes + * pushIntervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + * matchingPatternsList: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * + * } + * + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * + * "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * "interval.ms" should be between 100 and 360 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * + * + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +object ClientMetricsConfig { + + class SubscriptionInfo(subscriptionId: String, Review Comment: Moved to java implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense
[ https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781148#comment-17781148 ] Justine Olshan commented on KAFKA-14402: Sorry the wording is unclear. I will fix this. For older clients, the server will verify the partitions are in a given transaction. Older clients require no changes. If we changed them, they would be considered new (version) clients. The idea of KIP-890 part 1 would be that there were no client changes required. The flow is the same from the client side. Just on the server side we verify the transaction is ongoing. Hopefully this makes sense. > Transactions Server Side Defense > > > Key: KAFKA-14402 > URL: https://issues.apache.org/jira/browse/KAFKA-14402 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > We have seen hanging transactions in Kafka where the last stable offset (LSO) > does not update, we can’t clean the log (if the topic is compacted), and > read_committed consumers get stuck. > This can happen when a message gets stuck or delayed due to networking issues > or a network partition, the transaction aborts, and then the delayed message > finally comes in. The delayed message case can also violate EOS if the > delayed message comes in after the next addPartitionsToTxn request comes in. > Effectively we may see a message from a previous (aborted) transaction become > part of the next transaction. > Another way hanging transactions can occur is that a client is buggy and may > somehow try to write to a partition before it adds the partition to the > transaction. In both of these cases, we want the server to have some control > to prevent these incorrect records from being written and either causing > hanging transactions or violating Exactly once semantics (EOS) by including > records in the wrong transaction. > The best way to avoid this issue is to: > # *Uniquely identify transactions by bumping the producer epoch after every > commit/abort marker. That way, each transaction can be identified by > (producer id, epoch).* > # {*}Remove the addPartitionsToTxn call and implicitly just add partitions > to the transaction on the first produce request during a transaction{*}. > We avoid the late arrival case because the transaction is uniquely identified > and fenced AND we avoid the buggy client case because we remove the need for > the client to explicitly add partitions to begin the transaction. > Of course, 1 and 2 require client-side changes, so for older clients, those > approaches won’t apply. > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > > See KIP-890 for more information: ** > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1376883781 ## core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala: ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics Review Comment: Moved to java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1376883099 ## core/src/main/scala/kafka/server/ClientMetricsManager.scala: ## @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.clientmetrics.ClientMetricsConfig + +import java.util.Properties + +object ClientMetricsManager { + private val Instance = new ClientMetricsManager + + def getInstance: ClientMetricsManager = Instance Review Comment: Moved to java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1376882728 ## core/src/main/scala/kafka/server/ClientMetricsManager.scala: ## @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.metrics.clientmetrics.ClientMetricsConfig + +import java.util.Properties + +object ClientMetricsManager { Review Comment: Moved to java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense
[ https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781144#comment-17781144 ] Travis Bischel commented on KAFKA-14402: Sorry, I may be a bit confused then: when it says "for old clients to verify the partitions are in a given transaction" -- this can only happen in v4. How can a client / an admin verify that a partition is a part of the transaction if v4 is meant to be broker<=>broker only? > Transactions Server Side Defense > > > Key: KAFKA-14402 > URL: https://issues.apache.org/jira/browse/KAFKA-14402 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > We have seen hanging transactions in Kafka where the last stable offset (LSO) > does not update, we can’t clean the log (if the topic is compacted), and > read_committed consumers get stuck. > This can happen when a message gets stuck or delayed due to networking issues > or a network partition, the transaction aborts, and then the delayed message > finally comes in. The delayed message case can also violate EOS if the > delayed message comes in after the next addPartitionsToTxn request comes in. > Effectively we may see a message from a previous (aborted) transaction become > part of the next transaction. > Another way hanging transactions can occur is that a client is buggy and may > somehow try to write to a partition before it adds the partition to the > transaction. In both of these cases, we want the server to have some control > to prevent these incorrect records from being written and either causing > hanging transactions or violating Exactly once semantics (EOS) by including > records in the wrong transaction. > The best way to avoid this issue is to: > # *Uniquely identify transactions by bumping the producer epoch after every > commit/abort marker. That way, each transaction can be identified by > (producer id, epoch).* > # {*}Remove the addPartitionsToTxn call and implicitly just add partitions > to the transaction on the first produce request during a transaction{*}. > We avoid the late arrival case because the transaction is uniquely identified > and fenced AND we avoid the buggy client case because we remove the need for > the client to explicitly add partitions to begin the transaction. > Of course, 1 and 2 require client-side changes, so for older clients, those > approaches won’t apply. > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > > See KIP-890 for more information: ** > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15758) Always schedule wrapped callbacks
[ https://issues.apache.org/jira/browse/KAFKA-15758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-15758: --- Summary: Always schedule wrapped callbacks (was: Always schedule wrapped callbacks) > Always schedule wrapped callbacks > -- > > Key: KAFKA-15758 > URL: https://issues.apache.org/jira/browse/KAFKA-15758 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > As part of > [https://github.com/apache/kafka/commit/08aa33127a4254497456aa7a0c1646c7c38adf81] > the finding of the coordinator was moved to the AddPartitionsToTxnManager. > In the case of an error, we return the error on the wrapped callback. > This seemed to cause issues in the tests and we realized that executing the > callback directly and not rescheduling it on the request channel seemed to > resolve some issues. > One theory was that scheduling the callback before the request returned > caused issues. > Ideally we wouldn't have this special handling. This ticket is to remove it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866891 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -55,23 +55,23 @@ object KafkaRequestHandler { * @param fun Callback function to execute * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { + // If the callback is actually executed on the same request thread, we can directly execute Review Comment: https://issues.apache.org/jira/browse/KAFKA-15758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866596 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: I understand the issue. I have a commit below that clears the request local. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-4852) ByteBufferSerializer not compatible with offsets
[ https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781139#comment-17781139 ] Philip Nee commented on KAFKA-4852: --- cc [~luke.kirby] > ByteBufferSerializer not compatible with offsets > > > Key: KAFKA-4852 > URL: https://issues.apache.org/jira/browse/KAFKA-4852 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 > Environment: all >Reporter: Werner Daehn >Priority: Minor > Labels: needs-kip > > Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the > ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The > ByteBufferSerializer will send from pos=0 and not from pos=3 onwards. > Solution: No rewind() but flip() for reading a ByteBuffer. That's what the > flip is meant for. > Story: > Imagine the incoming data comes from a byte[], e.g. a network stream > containing topicname, partition, key, value, ... and you want to create a new > ProducerRecord for that. As the constructor of ProducerRecord requires > (topic, partition, key, value) you have to copy from above byte[] the key and > value. That means there is a memcopy taking place. Since the payload can be > potentially large, that introduces a lot of overhead. Twice the memory. > A nice solution to this problem is to simply wrap the network byte[] into new > ByteBuffers: > ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength); > ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength); > and then use the ByteBufferSerializer instead of the ByteArraySerializer. > But that does not work as the ByteBufferSerializer does a rewind(), hence > both, key and value, will start at position=0 of the data[]. > public class ByteBufferSerializer implements Serializer { > public byte[] serialize(String topic, ByteBuffer data) { > data.rewind(); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets
[ https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-4852: -- Labels: needs-kip serializers (was: needs-kip) > ByteBufferSerializer not compatible with offsets > > > Key: KAFKA-4852 > URL: https://issues.apache.org/jira/browse/KAFKA-4852 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 > Environment: all >Reporter: Werner Daehn >Priority: Minor > Labels: needs-kip, serializers > > Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the > ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The > ByteBufferSerializer will send from pos=0 and not from pos=3 onwards. > Solution: No rewind() but flip() for reading a ByteBuffer. That's what the > flip is meant for. > Story: > Imagine the incoming data comes from a byte[], e.g. a network stream > containing topicname, partition, key, value, ... and you want to create a new > ProducerRecord for that. As the constructor of ProducerRecord requires > (topic, partition, key, value) you have to copy from above byte[] the key and > value. That means there is a memcopy taking place. Since the payload can be > potentially large, that introduces a lot of overhead. Twice the memory. > A nice solution to this problem is to simply wrap the network byte[] into new > ByteBuffers: > ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength); > ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength); > and then use the ByteBufferSerializer instead of the ByteArraySerializer. > But that does not work as the ByteBufferSerializer does a rewind(), hence > both, key and value, will start at position=0 of the data[]. > public class ByteBufferSerializer implements Serializer { > public byte[] serialize(String topic, ByteBuffer data) { > data.rewind(); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15758) Always schedule wrapped callbacks
Justine Olshan created KAFKA-15758: -- Summary: Always schedule wrapped callbacks Key: KAFKA-15758 URL: https://issues.apache.org/jira/browse/KAFKA-15758 Project: Kafka Issue Type: Sub-task Reporter: Justine Olshan As part of [https://github.com/apache/kafka/commit/08aa33127a4254497456aa7a0c1646c7c38adf81] the finding of the coordinator was moved to the AddPartitionsToTxnManager. In the case of an error, we return the error on the wrapped callback. This seemed to cause issues in the tests and we realized that executing the callback directly and not rescheduling it on the request channel seemed to resolve some issues. One theory was that scheduling the callback before the request returned caused issues. Ideally we wouldn't have this special handling. This ticket is to remove it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15757) Do not advertise v4 AddPartitionsToTxn to clients
Justine Olshan created KAFKA-15757: -- Summary: Do not advertise v4 AddPartitionsToTxn to clients Key: KAFKA-15757 URL: https://issues.apache.org/jira/browse/KAFKA-15757 Project: Kafka Issue Type: Sub-task Reporter: Justine Olshan v4+ is intended to be a broker side API. Thus, we should not return it as a valid version to clients. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense
[ https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781132#comment-17781132 ] Justine Olshan commented on KAFKA-14402: Hi [~twmb] The KIP current states: > We will bump the versioning for AddPartitionsToTxn to add support for > "verifyOnly" mode used for old clients to verify the partitions are in a > given transaction. We will also add support to batch multiple waiting > requests by transactional ID. This newer version of the request will require > CLUSTER authorization so clients will not be able to use this version. I'm not sure I understand your statement. Admins are not sending this request. The broker is sending the request. The goal is to remove the API from the client-side altogether. > Transactions Server Side Defense > > > Key: KAFKA-14402 > URL: https://issues.apache.org/jira/browse/KAFKA-14402 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > We have seen hanging transactions in Kafka where the last stable offset (LSO) > does not update, we can’t clean the log (if the topic is compacted), and > read_committed consumers get stuck. > This can happen when a message gets stuck or delayed due to networking issues > or a network partition, the transaction aborts, and then the delayed message > finally comes in. The delayed message case can also violate EOS if the > delayed message comes in after the next addPartitionsToTxn request comes in. > Effectively we may see a message from a previous (aborted) transaction become > part of the next transaction. > Another way hanging transactions can occur is that a client is buggy and may > somehow try to write to a partition before it adds the partition to the > transaction. In both of these cases, we want the server to have some control > to prevent these incorrect records from being written and either causing > hanging transactions or violating Exactly once semantics (EOS) by including > records in the wrong transaction. > The best way to avoid this issue is to: > # *Uniquely identify transactions by bumping the producer epoch after every > commit/abort marker. That way, each transaction can be identified by > (producer id, epoch).* > # {*}Remove the addPartitionsToTxn call and implicitly just add partitions > to the transaction on the first produce request during a transaction{*}. > We avoid the late arrival case because the transaction is uniquely identified > and fenced AND we avoid the buggy client case because we remove the need for > the client to explicitly add partitions to begin the transaction. > Of course, 1 and 2 require client-side changes, so for older clients, those > approaches won’t apply. > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > > See KIP-890 for more information: ** > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15756) Migrate existing integration tests to run old protocol in new coordinator
Dongnuo Lyu created KAFKA-15756: --- Summary: Migrate existing integration tests to run old protocol in new coordinator Key: KAFKA-15756 URL: https://issues.apache.org/jira/browse/KAFKA-15756 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15756) Migrate existing integration tests to run old protocol in new coordinator
[ https://issues.apache.org/jira/browse/KAFKA-15756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu reassigned KAFKA-15756: --- Assignee: Dongnuo Lyu > Migrate existing integration tests to run old protocol in new coordinator > - > > Key: KAFKA-15756 > URL: https://issues.apache.org/jira/browse/KAFKA-15756 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376821250 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: There is a verification state for the doCommitTxnOffset path which is mostly the same path (execept for using storeOffsets instead of storeGroup) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376817191 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: It seems that storing the group coordinator state is not transactional, so verification code path shouldn't be used. But I agree with question of the locking model in the group coordinator and whether it'd work correctly with more asynchronous approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
cmccabe commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376808429 ## metadata/src/main/resources/common/metadata/PartitionRecord.json: ## @@ -47,6 +47,8 @@ "about": "The eligible leader replicas of this partition." }, { "name": "LastKnownELR", "type": "[]int32", "default": "null", "entityType": "brokerId", "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", "tag": 2, - "about": "The last known eligible leader replicas of this partition." } + "about": "The last known eligible leader replicas of this partition." }, +{ "name": "Directories", "type": "[]uuid", "versions": "2+", + "about": "The log directory hosting each replica, sorted in the same exact order as the Replicas field."} Review Comment: I would prefer to make it mandatory to make it clear that it's always present in version 2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
cmccabe commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376807529 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -16,55 +16,166 @@ */ package org.apache.kafka.common; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -public class DirectoryId { +public class DirectoryId extends Uuid { Review Comment: I don't see why we should extend Uuid rather than just having a utility class that handles Uuids in the way that we want. Inheritance is almost always a mistake (unless it's of an interface) This seems to add a lot of boilerplate to / from code compared with just using Uuid directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
cmccabe commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376805476 ## clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java: ## @@ -19,7 +19,7 @@ /** * An exception that may indicate the client's metadata is out of date */ -public abstract class InvalidMetadataException extends RetriableException { +public class InvalidMetadataException extends RetriableException { Review Comment: Hmm. It seems odd to make this non-abstract. Clearly the intention was to have a more specific class describe why the metadata was invalid. Why would we not follow this pattern here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense
[ https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781122#comment-17781122 ] Travis Bischel commented on KAFKA-14402: Another addendum, after your update: v4 of the AddPartitionsToTxn API requires CLUSTER_ACTION to semi-enforce broker-to-broker requests. Can the KIP be updated to document this? Alternatively, can requiring CLUSTER_ACTION be dropped? The KIP itself indicates that admins will send AddPartitionsToTxn to see whether a partition is in the transaction, and admins do not need CLUSTER_ACTION. As well, as a client library -- if I send the request with one transaction (i.e. use v4 the same as I use v3), there's no reason I should be limited to v3. > Transactions Server Side Defense > > > Key: KAFKA-14402 > URL: https://issues.apache.org/jira/browse/KAFKA-14402 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.5.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > We have seen hanging transactions in Kafka where the last stable offset (LSO) > does not update, we can’t clean the log (if the topic is compacted), and > read_committed consumers get stuck. > This can happen when a message gets stuck or delayed due to networking issues > or a network partition, the transaction aborts, and then the delayed message > finally comes in. The delayed message case can also violate EOS if the > delayed message comes in after the next addPartitionsToTxn request comes in. > Effectively we may see a message from a previous (aborted) transaction become > part of the next transaction. > Another way hanging transactions can occur is that a client is buggy and may > somehow try to write to a partition before it adds the partition to the > transaction. In both of these cases, we want the server to have some control > to prevent these incorrect records from being written and either causing > hanging transactions or violating Exactly once semantics (EOS) by including > records in the wrong transaction. > The best way to avoid this issue is to: > # *Uniquely identify transactions by bumping the producer epoch after every > commit/abort marker. That way, each transaction can be identified by > (producer id, epoch).* > # {*}Remove the addPartitionsToTxn call and implicitly just add partitions > to the transaction on the first produce request during a transaction{*}. > We avoid the late arrival case because the transaction is uniquely identified > and fenced AND we avoid the buggy client case because we remove the need for > the client to explicitly add partitions to begin the transaction. > Of course, 1 and 2 require client-side changes, so for older clients, those > approaches won’t apply. > 3. *To cover older clients, we will ensure a transaction is ongoing before we > write to a transaction. We can do this by querying the transaction > coordinator and caching the result.* > > See KIP-890 for more information: ** > https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15355: Message schema changes [kafka]
soarez commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376787884 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -310,11 +337,22 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, short versio setLeaderRecoveryState(leaderRecoveryState.value()). setLeaderEpoch(leaderEpoch). setPartitionEpoch(partitionEpoch); -if (version > 0) { +if (options.metadataVersion().isElrSupported()) { record.setEligibleLeaderReplicas(Replicas.toList(elr)). setLastKnownELR(Replicas.toList(lastKnownElr)); } -return new ApiMessageAndVersion(record, version); +if (options.metadataVersion().isDirectoryAssignmentSupported()) { +record.setDirectories(DirectoryId.toList(directories)); +} else { +for (int i = 0; i < directories.length; i++) { Review Comment: You're right. I've updated the constructor so that `directories` can never be null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
soarez commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376787569 ## metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java: ## @@ -49,12 +51,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.when; @Timeout(value = 40) public class PartitionChangeBuilderTest { private static Stream partitionChangeRecordVersions() { -return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> Arguments.of((short) version)); +return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1) +.filter(v -> v != PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION) // TODO test latest record version in KAFKA-15514 Review Comment: Good catch, I forgot this. Removed and fixed failing tests. ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -122,8 +132,13 @@ public PartitionRegistration build() { throw new IllegalStateException("You must set last known elr."); } +if (directories == null) { +directories = DirectoryId.unassignedArray(replicas.length); +} Review Comment: Good point. I've moved this to the underlying constructor to deal with both cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
soarez commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1376787303 ## clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java: ## @@ -19,7 +19,7 @@ /** * An exception that may indicate the client's metadata is out of date */ -public abstract class InvalidMetadataException extends RetriableException { +public class InvalidMetadataException extends RetriableException { Review Comment: I had made this change to deal with a partition record with different lengths for `replicas` and `directories`. I've now realized that check hadn't been pushed. If we need to keep this abstract for some reason we can create a new Exception type instead. ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -16,55 +16,166 @@ */ package org.apache.kafka.common; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; -public class DirectoryId { +public class DirectoryId extends Uuid { /** - * A UUID that is used to identify new or unknown dir assignments. + * A DirectoryId that is used to identify new or unknown dir assignments. */ -public static final Uuid UNASSIGNED = new Uuid(0L, 0L); +public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L); /** - * A UUID that is used to represent unspecified offline dirs. + * A DirectoryId that is used to represent unspecified offline dirs. */ -public static final Uuid LOST = new Uuid(0L, 1L); +public static final DirectoryId LOST = new DirectoryId(0L, 1L); /** - * A UUID that is used to represent and unspecified log directory, + * A DirectoryId that is used to represent and unspecified log directory, * that is expected to have been previously selected to host an * associated replica. This contrasts with {@code UNASSIGNED_DIR}, * which is associated with (typically new) replicas that may not * yet have been placed in any log directory. */ -public static final Uuid MIGRATING = new Uuid(0L, 2L); +public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L); /** * The set of reserved UUIDs that will never be returned by the random method. */ -public static final Set RESERVED; +public static final Set RESERVED; static { -HashSet reserved = new HashSet<>(Uuid.RESERVED); -// The first 100 UUIDs are reserved for future use. +HashSet reserved = new HashSet<>(); +// The first 100 DirectoryIds are reserved for future use. for (long i = 0L; i < 100L; i++) { -reserved.add(new Uuid(0L, i)); +reserved.add(new DirectoryId(0L, i)); } RESERVED = Collections.unmodifiableSet(reserved); } +/** + * Constructs a Directory ID from the underlying 128 bits, + * exactly as a {@link Uuid} is constructed. + */ +private DirectoryId(long mostSigBits, long leastSigBits) { +super(mostSigBits, leastSigBits); +} + +/** + * Creates a DirectoryId based on a base64 string encoding used in the toString() method. + */ +public static DirectoryId fromString(String str) { +return DirectoryId.fromUuid(Uuid.fromString(str)); +} + +/** + * Creates a DirectoryId based on a {@link Uuid}. + */ +public static DirectoryId fromUuid(Uuid uuid) { +return new DirectoryId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); +} Review Comment: I've added a test to verify the reserved values, using MSB/LSB as somewhat less indirect test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1376784755 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java: ## @@ -102,4 +140,230 @@ static class RawAndDeserializedValue { this.value = value; } } + +@SuppressWarnings("unchecked") +@Override +public QueryResult query(final Query query, +final PositionBound positionBound, +final QueryConfig config) { + +final long start = time.nanoseconds(); +final QueryResult result; + +final StoreQueryUtils.QueryHandler handler = queryHandlers.get(query.getClass()); +if (handler == null) { +result = wrapped().query(query, positionBound, config); +if (config.isCollectExecutionInfo()) { +result.addExecutionInfo( +"Handled in " + getClass() + " in " + (time.nanoseconds() - start) + "ns"); +} +} else { +result = (QueryResult) handler.apply( +query, +positionBound, +config, +this +); +if (config.isCollectExecutionInfo()) { +result.addExecutionInfo( +"Handled in " + getClass() + " with serdes " ++ serdes + " in " + (time.nanoseconds() - start) + "ns"); +} +} +return result; +} + + + +@SuppressWarnings("unchecked") +protected QueryResult runTimestampKeyQuery(final Query query, + final PositionBound positionBound, + final QueryConfig config) { +final QueryResult result; +final TimestampedKeyQuery typedKeyQuery = (TimestampedKeyQuery) query; +final KeyQuery rawKeyQuery = +KeyQuery.withKey(keyBytes(typedKeyQuery.getKey())); +final QueryResult rawResult = +wrapped().query(rawKeyQuery, positionBound, config); +if (rawResult.isSuccess()) { +final Function> deserializer = getDeserializeValue(serdes, wrapped()); +final ValueAndTimestamp value = deserializer.apply(rawResult.getResult()); +final QueryResult> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value); +result = (QueryResult) typedQueryResult; +} else { +// the generic type doesn't matter, since failed queries have no result set. +result = (QueryResult) rawResult; +} +return result; +} + +@SuppressWarnings("unchecked") +protected QueryResult runTimestampRangeQuery(final Query query, +final PositionBound positionBound, +final QueryConfig config) { + +final QueryResult result; +final TimestampedRangeQuery typedQuery = (TimestampedRangeQuery) query; + +final RangeQuery rawRangeQuery; +if (typedQuery.getLowerBound().isPresent() && typedQuery.getUpperBound().isPresent()) { +rawRangeQuery = RangeQuery.withRange( +keyBytes(typedQuery.getLowerBound().get()), +keyBytes(typedQuery.getUpperBound().get()) +); +} else if (typedQuery.getLowerBound().isPresent()) { +rawRangeQuery = RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get())); +} else if (typedQuery.getUpperBound().isPresent()) { +rawRangeQuery = RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get())); +} else { +rawRangeQuery = RangeQuery.withNoBounds(); +} +final QueryResult> rawResult = +wrapped().query(rawRangeQuery, positionBound, config); +if (rawResult.isSuccess()) { +final KeyValueIterator iterator = rawResult.getResult(); +final KeyValueIterator resultIterator = new MeteredKeyValueTimestampedIterator( +iterator, +getSensor, +getDeserializeValue(serdes, wrapped()), +false +); +final QueryResult> typedQueryResult = + InternalQueryResultUtil.copyAndSubstituteDeserializedResult( +rawResult, +resultIterator +); +result = (QueryResult) typedQueryResult; +} else { +// the generic type doesn't matter, since failed queries have no result set. +result = (QueryResult) rawResult; +} +return result; +} + +
[jira] [Resolved] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer
[ https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-15602. - Fix Version/s: 3.4.2 3.5.2 3.7.0 3.6.1 Assignee: Matthias J. Sax Resolution: Fixed As discussed, reverted this in all applicable branches. > Breaking change in 3.4.0 ByteBufferSerializer > - > > Key: KAFKA-15602 > URL: https://issues.apache.org/jira/browse/KAFKA-15602 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1 >Reporter: Luke Kirby >Assignee: Matthias J. Sax >Priority: Critical > Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1 > > > [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have > solved the situation described by KAFKA-4852, namely, to have > ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 > offsets (or, put another way, to honor the buffer's position() as the start > point to consume bytes from). Unfortunately, it failed to actually do this, > and instead changed the expectations for how an input ByteBuffer's limit and > position should be set before being provided to send() on a producer > configured with ByteBufferSerializer. Code that worked with pre-3.4.0 > releases now produce 0-length messages instead of the intended messages, > effectively introducing a breaking change for existing users of the > serializer in the wild. > Here are a few different inputs and serialized outputs under pre-3.4.0 and > 3.4.0+ to summarize the breaking change: > ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output|| > |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 > val=test|len=4 val=test| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 > val=test|len=0 val=| > |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 > val=test<0><0><0><0>|len=4 val=test| > |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8)); > buff.limit(buff.position());|len=4 > val=test|len=4 val=test| > |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t| > Notably, plain-wrappers of byte arrays continue to work under both versions > due to the special case in the serializer for them. I suspect that this is > the dominant use-case, which is why this has apparently gone un-reported to > this point. The wrapped-with-offset case fails for both cases for different > reasons (the expected value would be "est"). As demonstrated here, you can > ensure that a manually assembled ByteBuffer will work under both versions by > ensuring that your buffers start have position == limit == message-length > (and an actual desired start position of 0). Clearly, though, behavior has > changed dramatically for the second and third case there, with the 3.3.2 > behavior, in my experience, aligning better with naive expectations. > [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java], > the serializer would just rewind() the buffer and respect the limit as the > indicator as to how much data was in the buffer. So, essentially, the > prevailing contract was that the data from position 0 (always!) up to the > limit on the buffer would be serialized; so it was really just the limit that > was honored. So if, per the original issue, you have a byte[] array wrapped > with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() > with position = 3 indicating the desired start point to read from, but > effectively ignored by the serializer due to the rewind(). > So while the serializer didn't work when presenting a ByteBuffer view onto a > sub-view of a backing array, it did however follow expected behavior when > employing standard patterns to populate ByteBuffers backed by > larger-than-necessary arrays and using limit() to identify the end of actual > data, consistent with conventional usage of flip() to switch from writing to > a buffer to setting it up to be read from (e.g., to be passed into a > producer.send() call). E.g., > {code:java} > ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH); > ... // some sequence of > bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH > ... > bb.flip(); /* logically, this says "I am done writing, let's set this up for > reading"; pragmatically, it sets the limit to the current position so that > whoever reads the buffer knows when to stop reading, and sets the position to > zero so it knows where to start reading from */ > producer.send(bb); {code} > Technically, you wouldn't even need to use flip() there, since position
Re: [PR] KAFKA-15277: ConsumerDelegate support [kafka]
kirktrue closed pull request #14664: KAFKA-15277: ConsumerDelegate support URL: https://github.com/apache/kafka/pull/14664 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-4852) ByteBufferSerializer not compatible with offsets
[ https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-4852: Assignee: (was: LinShunkang) > ByteBufferSerializer not compatible with offsets > > > Key: KAFKA-4852 > URL: https://issues.apache.org/jira/browse/KAFKA-4852 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 > Environment: all >Reporter: Werner Daehn >Priority: Minor > Fix For: 3.4.0 > > > Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the > ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The > ByteBufferSerializer will send from pos=0 and not from pos=3 onwards. > Solution: No rewind() but flip() for reading a ByteBuffer. That's what the > flip is meant for. > Story: > Imagine the incoming data comes from a byte[], e.g. a network stream > containing topicname, partition, key, value, ... and you want to create a new > ProducerRecord for that. As the constructor of ProducerRecord requires > (topic, partition, key, value) you have to copy from above byte[] the key and > value. That means there is a memcopy taking place. Since the payload can be > potentially large, that introduces a lot of overhead. Twice the memory. > A nice solution to this problem is to simply wrap the network byte[] into new > ByteBuffers: > ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength); > ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength); > and then use the ByteBufferSerializer instead of the ByteArraySerializer. > But that does not work as the ByteBufferSerializer does a rewind(), hence > both, key and value, will start at position=0 of the data[]. > public class ByteBufferSerializer implements Serializer { > public byte[] serialize(String topic, ByteBuffer data) { > data.rewind(); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets
[ https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4852: --- Labels: needs-kip (was: ) > ByteBufferSerializer not compatible with offsets > > > Key: KAFKA-4852 > URL: https://issues.apache.org/jira/browse/KAFKA-4852 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 > Environment: all >Reporter: Werner Daehn >Priority: Minor > Labels: needs-kip > > Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the > ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The > ByteBufferSerializer will send from pos=0 and not from pos=3 onwards. > Solution: No rewind() but flip() for reading a ByteBuffer. That's what the > flip is meant for. > Story: > Imagine the incoming data comes from a byte[], e.g. a network stream > containing topicname, partition, key, value, ... and you want to create a new > ProducerRecord for that. As the constructor of ProducerRecord requires > (topic, partition, key, value) you have to copy from above byte[] the key and > value. That means there is a memcopy taking place. Since the payload can be > potentially large, that introduces a lot of overhead. Twice the memory. > A nice solution to this problem is to simply wrap the network byte[] into new > ByteBuffers: > ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength); > ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength); > and then use the ByteBufferSerializer instead of the ByteArraySerializer. > But that does not work as the ByteBufferSerializer does a rewind(), hence > both, key and value, will start at position=0 of the data[]. > public class ByteBufferSerializer implements Serializer { > public byte[] serialize(String topic, ByteBuffer data) { > data.rewind(); -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15277: ConsumerDelegate support [kafka]
kirktrue commented on PR #14664: URL: https://github.com/apache/kafka/pull/14664#issuecomment-1785968980 This was a simple experiment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]
kirktrue opened a new pull request, #14670: URL: https://github.com/apache/kafka/pull/14670 The consumer refactoring project introduced another `Consumer` implementation, creating two different, coexisting implementations of the `Consumer` interface: * `KafkaConsumer` (AKA "existing", "legacy" consumer) * `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer) The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level `KafkaConsumer` but then delegate to another implementation under the covers. There will be two delegates at first: * `LegacyKafkaConsumer` * `AsyncKafkaConsumer` `LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That implementation handles the existing group protocol. `AsyncKafkaConsumer` is renamed from `PrototypeAsyncConsumer` and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the `internals` sub-package to discourage their use. This task is part of the work to implement support for the new KIP-848 consumer group protocol. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets
[ https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4852: --- Fix Version/s: (was: 3.4.0) > ByteBufferSerializer not compatible with offsets > > > Key: KAFKA-4852 > URL: https://issues.apache.org/jira/browse/KAFKA-4852 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 > Environment: all >Reporter: Werner Daehn >Priority: Minor > > Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the > ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The > ByteBufferSerializer will send from pos=0 and not from pos=3 onwards. > Solution: No rewind() but flip() for reading a ByteBuffer. That's what the > flip is meant for. > Story: > Imagine the incoming data comes from a byte[], e.g. a network stream > containing topicname, partition, key, value, ... and you want to create a new > ProducerRecord for that. As the constructor of ProducerRecord requires > (topic, partition, key, value) you have to copy from above byte[] the key and > value. That means there is a memcopy taking place. Since the payload can be > potentially large, that introduces a lot of overhead. Twice the memory. > A nice solution to this problem is to simply wrap the network byte[] into new > ByteBuffers: > ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength); > ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength); > and then use the ByteBufferSerializer instead of the ByteArraySerializer. > But that does not work as the ByteBufferSerializer does a rewind(), hence > both, key and value, will start at position=0 of the data[]. > public class ByteBufferSerializer implements Serializer { > public byte[] serialize(String topic, ByteBuffer data) { > data.rewind(); -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15277) Design & implement support for internal Consumer delegates
[ https://issues.apache.org/jira/browse/KAFKA-15277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15277: -- Description: The consumer refactoring project introduced another {{Consumer}} implementation, creating two different, coexisting implementations of the {{Consumer}} interface: * {{KafkaConsumer}} (AKA "existing", "legacy" consumer) * {{PrototypeAsyncConsumer}} (AKA "new", "refactored" consumer) The goal of this task is to refactor the code via the delegation pattern so that we can keep a top-level {{KafkaConsumer}} but then delegate to another implementation under the covers. There will be two delegates at first: * {{LegacyKafkaConsumer}} * {{AsyncKafkaConsumer}} {{LegacyKafkaConsumer}} essentially be a renamed {{{}KafkaConsumer{}}}. That implementation handles the existing group protocol. {{AsyncKafkaConsumer}} is renamed from {{PrototypeAsyncConsumer}} and will implement the new consumer group protocol from KIP-848. Both of those implementations will live in the "internals" sub-package to discourage their use. This task is part of the work to implement support for the new KIP-848 consumer group protocol. was: As mentioned above, there are presently two different, coexisting implementations of the {{Consumer}} interface: {{KafkaConsumer}} ("old") and {{PrototypeAsyncConsumer}} ("new"). Eventually, these will be reorganized using the delegation pattern. The top-level {{KafkaConsumer}} that implements the old protocol will be renamed as {{LegacyKafkaConsumerDelegate}} and {{PrototypeAsyncConsumer}} will be renamed as {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}}. It is assumed that neither {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}} nor {{{}LegacyKafkaConsume{}}}{{{}rDelegate{}}} will be top-level implementations of {{{}Consumer{}}}, but will likely implement an internal interface that is better suited to the needs of the top-level {{{}KafkaConsumer{}}}. Provide the Java client support for the consumer delegates, including: * Create {{ConsumerDelegate}} interface * Clone {{{}KafkaConsumer{}}}, rename as {{LegacyKafkaConsumerDelegate}} and refactor to implement {{ConsumerDelegate}} * Rename {{PrototypeAsyncConsumer}} to {{AsyncKafkaConsumerDelegate}} and refactor to implement the {{ConsumerDelegate}} interface * Refactor the (original) {{KafkaConsumer}} to remove the core implementation, instead delegating to the {{{}ConsumerDelegate{}}}, which will be hard-coded to use {{LegacyKafkaConsumerDelegate}} * Once available (in KAFKA-15284), use the {{ConsumerGroupProtocolVersionResolver}} to determine which delegate to use This task is part of the work to implement support for the new KIP-848 consumer group protocol. > Design & implement support for internal Consumer delegates > -- > > Key: KAFKA-15277 > URL: https://issues.apache.org/jira/browse/KAFKA-15277 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848, kip-848-e2e, > kip-848-preview > > The consumer refactoring project introduced another {{Consumer}} > implementation, creating two different, coexisting implementations of the > {{Consumer}} interface: > * {{KafkaConsumer}} (AKA "existing", "legacy" consumer) > * {{PrototypeAsyncConsumer}} (AKA "new", "refactored" consumer) > The goal of this task is to refactor the code via the delegation pattern so > that we can keep a top-level {{KafkaConsumer}} but then delegate to another > implementation under the covers. There will be two delegates at first: > * {{LegacyKafkaConsumer}} > * {{AsyncKafkaConsumer}} > {{LegacyKafkaConsumer}} essentially be a renamed {{{}KafkaConsumer{}}}. That > implementation handles the existing group protocol. {{AsyncKafkaConsumer}} is > renamed from {{PrototypeAsyncConsumer}} and will implement the new consumer > group protocol from KIP-848. Both of those implementations will live in the > "internals" sub-package to discourage their use. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15602: revert KAFKA-4852 [kafka]
mjsax merged PR #14617: URL: https://github.com/apache/kafka/pull/14617 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15643) Improve unloading logging
[ https://issues.apache.org/jira/browse/KAFKA-15643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ritika Muduganti resolved KAFKA-15643. -- Reviewer: David Jacot Resolution: Fixed > Improve unloading logging > - > > Key: KAFKA-15643 > URL: https://issues.apache.org/jira/browse/KAFKA-15643 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Ritika Muduganti >Priority: Major > > When a new leader is elected for a __consumer_offset partition, the followers > are notified to unload the state. However, only the former leader is aware of > it. The remaining follower prints out the following error: > ERROR [GroupCoordinator id=1] Execution of > UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not > the correct coordinator.. > (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime) > The error is actually correct but we should improve the logging to not print > anything when in the remaining follower case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15284: - Assignee: Kirk True > Implement ConsumerGroupProtocolVersionResolver to determine consumer group > protocol > --- > > Key: KAFKA-15284 > URL: https://issues.apache.org/jira/browse/KAFKA-15284 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > > At client initialization, we need to determine which of the > {{ConsumerDelegate}} implementations to use: > # {{LegacyKafkaConsumerDelegate}} > # {{AsyncKafkaConsumerDelegate}} > There are conditions defined by KIP-848 that determine client eligibility to > use the new protocol. This will be modeled by the—deep > breath—{{{}ConsumerGroupProtocolVersionResolver{}}}. > Known tasks: > * Determine at what point in the {{Consumer}} initialization the network > communication should happen > * Determine what RPCs to invoke in order to determine eligibility (API > versions, IBP version, etc.) > * Implement the network client lifecycle (startup, communication, shutdown, > etc.) > * Determine the fallback path in case the client is not eligible to use the > protocol -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15277) Design & implement support for internal Consumer delegates
[ https://issues.apache.org/jira/browse/KAFKA-15277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15277: - Assignee: Kirk True (was: Philip Nee) > Design & implement support for internal Consumer delegates > -- > > Key: KAFKA-15277 > URL: https://issues.apache.org/jira/browse/KAFKA-15277 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, kip-848, kip-848-e2e, > kip-848-preview > > As mentioned above, there are presently two different, coexisting > implementations of the {{Consumer}} interface: {{KafkaConsumer}} ("old") and > {{PrototypeAsyncConsumer}} ("new"). Eventually, these will be reorganized > using the delegation pattern. The top-level {{KafkaConsumer}} that implements > the old protocol will be renamed as {{LegacyKafkaConsumerDelegate}} and > {{PrototypeAsyncConsumer}} will be renamed as > {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}}. It is assumed that neither > {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}} nor > {{{}LegacyKafkaConsume{}}}{{{}rDelegate{}}} will be top-level implementations > of {{{}Consumer{}}}, but will likely implement an internal interface that is > better suited to the needs of the top-level {{{}KafkaConsumer{}}}. > Provide the Java client support for the consumer delegates, including: > * Create {{ConsumerDelegate}} interface > * Clone {{{}KafkaConsumer{}}}, rename as {{LegacyKafkaConsumerDelegate}} and > refactor to implement {{ConsumerDelegate}} > * Rename {{PrototypeAsyncConsumer}} to {{AsyncKafkaConsumerDelegate}} and > refactor to implement the {{ConsumerDelegate}} interface > * Refactor the (original) {{KafkaConsumer}} to remove the core > implementation, instead delegating to the {{{}ConsumerDelegate{}}}, which > will be hard-coded to use {{LegacyKafkaConsumerDelegate}} > * Once available (in KAFKA-15284), use the > {{ConsumerGroupProtocolVersionResolver}} to determine which delegate to use > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1376703936 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT); +public static final String GROUP_PROTOCOL_DOC = "The group protocol consumer should use. We currently " + Review Comment: nit: Let's remove the double spaces after the dots. ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() { final ConsumerConfig consumerConfig = new ConsumerConfig(configs); assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } + +@Test +public void testDefaultConsumerGroupConfig() { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("generic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(null, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@Test +public void testValidConsumerGroupConfig() { +String remoteAssignorName = "org.apache.kafka.clients.group.someAssignor"; +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); +configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, "org.apache.kafka.clients.group.someAssignor"); Review Comment: nit: I suppose that we could reuse `remoteAssignorName` here? ## clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java: ## @@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() { final ConsumerConfig consumerConfig = new ConsumerConfig(configs); assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); } + +@Test +public void testDefaultConsumerGroupConfig() { +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("generic", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(null, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} + +@Test +public void testValidConsumerGroupConfig() { +String remoteAssignorName = "org.apache.kafka.clients.group.someAssignor"; +final Map configs = new HashMap<>(); +configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); +configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); +configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"); +configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, "org.apache.kafka.clients.group.someAssignor"); +final ConsumerConfig consumerConfig = new ConsumerConfig(configs); +assertEquals("consumer", consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)); +assertEquals(remoteAssignorName, consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG)); +} Review Comment: Should we add a test to ensure that `GROUP_PROTOCOL_CONFIG` can only accept `consumer` and `generic`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781095#comment-17781095 ] ASF GitHub Bot commented on KAFKA-15653: jolshan merged PR #564: URL: https://github.com/apache/kafka-site/pull/564 > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Justine Olshan >Priority: Major > Attachments: repro.sh > > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac commented on code in PR #14589: URL: https://github.com/apache/kafka/pull/14589#discussion_r1376679603 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1500,6 +1502,8 @@ class KafkaApis(val requestChannel: RequestChannel, new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId(offsetFetchRequest.groupId) .setErrorCode(Errors.forException(exception).code) + } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) { Review Comment: ditto. ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -1461,6 +1461,8 @@ class KafkaApis(val requestChannel: RequestChannel, new OffsetFetchResponseData.OffsetFetchResponseGroup() .setGroupId(offsetFetchRequest.groupId) .setErrorCode(Errors.forException(exception).code) + } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) { Review Comment: nit: We could remove the `()` after `code`. ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -4701,6 +4751,89 @@ class KafkaApisTest { assertEquals(expectedOffsetFetchResponse, response.data) } + @Test + def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = { +def makeRequest(version: Short): RequestChannel.Request = { + val groups = Map( +"group-1" -> List( + new TopicPartition("foo", 0), + new TopicPartition("bar", 0) +).asJava, +"group-2" -> List( + new TopicPartition("foo", 0), + new TopicPartition("bar", 0) +).asJava + ).asJava + buildRequest(new OffsetFetchRequest.Builder(groups, false, false).build(version)) +} + +val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion) + +val authorizer: Authorizer = mock(classOf[Authorizer]) + +val acls = Map( + "group-1" -> AuthorizationResult.ALLOWED, + "group-2" -> AuthorizationResult.ALLOWED, + "foo" -> AuthorizationResult.DENIED, + "bar" -> AuthorizationResult.ALLOWED +) + +when(authorizer.authorize( + any[RequestContext], + any[util.List[Action]] +)).thenAnswer { invocation => + val actions = invocation.getArgument(1, classOf[util.List[Action]]) + actions.asScala.map { action => +acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED) + }.asJava +} + +// group-1 and group-2 are allowed and bar is allowed. +val group1Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() +when(groupCoordinator.fetchOffsets( + requestChannelRequest.context, + new OffsetFetchRequestData.OffsetFetchRequestGroup() +.setGroupId("group-1") +.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava)).asJava), + false +)).thenReturn(group1Future) + +val group2Future = new CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]() +when(groupCoordinator.fetchOffsets( + requestChannelRequest.context, + new OffsetFetchRequestData.OffsetFetchRequestGroup() +.setGroupId("group-2") +.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics() + .setName("bar") + .setPartitionIndexes(List[Integer](0).asJava)).asJava), + false +)).thenReturn(group1Future) + +createKafkaApis(authorizer = Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching) + +// group-2 mocks using the new group coordinator. +// When the coordinator is not active, a response with error code is returned. Review Comment: Should we add a note about `foo` here? The whole point of this test is to ensure that the failed topics are not present in the response when there is a top level error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376679750 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: I am saying that before the append was under the lock. With the verification change, now it is not. I think we are saying the same thing here. We need to figure out if the append not under the lock is safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376678541 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: I we reduce then I assume the thread just stops? ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: If we reduce then I assume the thread just stops? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]
junrao closed pull request #14603: KAFKA-15582: Move the clean shutdown file to the storage package URL: https://github.com/apache/kafka/pull/14603 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]
junrao commented on PR #14603: URL: https://github.com/apache/kafka/pull/14603#issuecomment-1785851656 @CalvinConfluent : -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15628) Refactor ConsumerRebalanceListener invocation for reuse
[ https://issues.apache.org/jira/browse/KAFKA-15628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-15628. - Fix Version/s: 3.7.0 Resolution: Fixed > Refactor ConsumerRebalanceListener invocation for reuse > --- > > Key: KAFKA-15628 > URL: https://issues.apache.org/jira/browse/KAFKA-15628 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > Pull out the code related to invoking {{ConsumerRebalanceListener}} methods > into its own class so that it can be reused by the KIP-848 implementation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]
dajac merged PR #14638: URL: https://github.com/apache/kafka/pull/14638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376674826 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > It would just be the append itself that is not under the lock. Hmm, is that true? We have the following code. ``` GroupCoordinator.doSyncGroup group.inLock { ... groupManager.storeGroup ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376671529 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: Not sure how strongly we need the `GroupMetadata` lock while appending to the log. But I think the intention is to make sure that all writes to the log for a group are serialized. Updating the in-memory state typically happens in the callback when the HWM advances and is also protected by the `GroupMetadata` lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376670972 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: It seems that we do have the lock for the in-memory state. It would just be the append itself that is not under the lock. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376662251 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > I am wondering about the locking model. We have the following path that's called under the GroupMetadata lock. GroupCoordinator.doSyncGroup (hold GroupMetadata lock) => groupManager.storeGroup => appendForGroup => replicaManager.appendRecords => replicaManager.appendEntries. However, if we register the callback, replicaManager.appendEntries will be called without holding the GroupMetadata lock. Is that safe? Hmm. Do we need the lock to write to log or just to updaet the in-memory state. I can take another look at this path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376661650 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: True in the common case. I was wondering what happens if we dynamically reduce the number of request handler threads. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376659689 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Hmm. We assign the request channel when the thread is created. I assume that request channel will remain with the thread during its lifetime. Is that incorrect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376657322 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Thanks, Justine and Artem. Yes, passing around the context has it's own issues. The main thing with thread local is to make sure that we don't introduce any GC issues. Currently, it seems that we never remove `threadRequestChannel`. Since we allow dynamically changing the number of request handler threads, it's probably better to remove `threadRequestChannel` when the thread completes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
divijvaidya commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376654240 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: > actionable practical recommendation I can't think of an idea other than passing deep copy of arguments. I have also considered suggesting wrapping in Collections.unmodifiable or Collections.synchronizedCollection but none of these ideas are good enough. We need to re-think callback invocation pattern in Kafka and see if we can avoid this pattern of passing reference of in-memory state around threads. But I am happy to keep it our of this PR and discuss that in scope of JIRA that Ismael created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1376651533 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig, } } + /* + * Note: This method can be used as a callback in a different request thread. Ensure that correct RequestLocal + * is passed when executing this method. Accessing non-thread-safe data structures should be avoided if possible. + */ + private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], +internalTopicsAllowed: Boolean, +origin: AppendOrigin, +requiredAcks: Short, +verificationGuards: Map[TopicPartition, VerificationGuard], Review Comment: 1. On the server side, some of the callbacks (Purgatory, the one introduced in KAFKA-14561, etc) are called in a different thread than the caller. I am not sure if it's possible/desirable to change all of those callbacks to be run on the same thread. Given that model, I agree that the wide exposure of ThreadLocal seems potentially dangerous. I pinged https://github.com/apache/kafka/pull/9220 to see if we could consider an alternative approach. It might be easier to fix the `ThreadLocal` thing first before fixing this issue. 2. In general, callbacks on different threads are tricky to get right. For example, we spend a lot of time to fix the deadlock issues related to Purgatory (https://issues.apache.org/jira/browse/KAFKA-8334). For this newly introduced callback, I am wondering about the locking model. We have the following path that's called under the GroupMetadata lock. `GroupCoordinator.doSyncGroup (hold GroupMetadata lock) => groupManager.storeGroup => appendForGroup => replicaManager.appendRecords => replicaManager.appendEntries`. However, if we register the callback, `replicaManager.appendEntries` will be called without holding the `GroupMetadata` lock. Is that safe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
[ https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781075#comment-17781075 ] Philip Nee commented on KAFKA-15631: Hi [~lianetm] Thanks for filing this issue. I believe the scenario should have been covered by the current tests, i.e. "testNetworkTimeout" and "testHeartbeatOnStartup" should already test the inflight heartbeat request. I'm closing this issue for now. If this is indeed an issue, we can reopen this. > Do not send new heartbeat request while another one in-flight > - > > Key: KAFKA-15631 > URL: https://issues.apache.org/jira/browse/KAFKA-15631 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Client consumer should not send a new heartbeat request while there is a > previous in-flight. If a HB is in-flight, we should wait for a response or > timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]
rreddy-22 commented on PR #14582: URL: https://github.com/apache/kafka/pull/14582#issuecomment-1785758202 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5915/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
[ https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15631. Resolution: Not A Problem > Do not send new heartbeat request while another one in-flight > - > > Key: KAFKA-15631 > URL: https://issues.apache.org/jira/browse/KAFKA-15631 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Philip Nee >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > Client consumer should not send a new heartbeat request while there is a > previous in-flight. If a HB is in-flight, we should wait for a response or > timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]
rreddy-22 commented on code in PR #14657: URL: https://github.com/apache/kafka/pull/14657#discussion_r1376601434 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1359,7 +1359,8 @@ public void scheduleUnloadOperation( log.info("Scheduling unloading of metadata for {} with epoch {}", tp, partitionEpoch); Review Comment: What should we rephrase it as? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"
[ https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781069#comment-17781069 ] Ismael Juma commented on KAFKA-15754: - [~jolshan] I think this issue can happen if some code does _not_ use `Uuid.toString()` and instead uses Java's `UUID.toString()` somehow. > The kafka-storage tool can generate UUID starting with "-" > -- > > Key: KAFKA-15754 > URL: https://issues.apache.org/jira/browse/KAFKA-15754 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0 >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Major > > Using the kafka-storage.sh tool, it seems that it can still generate a UUID > starting with a dash "-", which then breaks how the argparse4j library works. > With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with > the following error: > kafka-storage: error: argument --cluster-id/-t: expected one argument > Said that, it seems that this problem was already addressed in the > Uuid.randomUuid method which keeps generating a new UUID until it doesn't > start with "-". This is the commit addressing it > [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce] > The problem is that when the toString is called on the Uuid instance, it's > going to do a Base64 encoding on the generated UUID this way: > {code:java} > Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); > {code} > Not sure why, but the code is using an URL (safe) encoder which, taking a > look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using > the following alphabet: > > {code:java} > private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', > 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', > 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', > 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', > 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code} > which as you can see includes the "-" character. > So despite the current Uuid.randomUuid is avoiding the generation of a UUID > containing a dash, the Base64 encoding operation can return a final UUID > starting with the dash instead. > > I was wondering if there is any good reason for using a Base64 URL encoder > and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet > not containing the "-". -- This message was sent by Atlassian Jira (v8.20.10#820010)