Re: [PR] MINOR: fix typo and comment [kafka]
dengziming merged PR #14650: URL: https://github.com/apache/kafka/pull/14650 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rename log dir UUIDs [kafka]
dengziming commented on code in PR #14517: URL: https://github.com/apache/kafka/pull/14517#discussion_r1375160304 ## server-common/src/main/java/org/apache/kafka/common/DirectoryId.java: ## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class DirectoryId { + +/** + * A UUID that is used to identify new or unknown dir assignments. + */ +public static final Uuid UNASSIGNED = new Uuid(0L, 0L); + +/** + * A UUID that is used to represent unspecified offline dirs. + */ +public static final Uuid LOST = new Uuid(0L, 1L); + +/** + * A UUID that is used to represent and unspecified log directory, + * that is expected to have been previously selected to host an + * associated replica. This contrasts with {@code UNASSIGNED_DIR}, + * which is associated with (typically new) replicas that may not + * yet have been placed in any log directory. + */ +public static final Uuid MIGRATING = new Uuid(0L, 2L); + +/** + * The set of reserved UUIDs that will never be returned by the random method. + */ +public static final Set RESERVED; + +static { +HashSet reserved = new HashSet<>(Uuid.RESERVED); +// The first 100 UUIDs are reserved for future use. +for (long i = 0L; i < 100L; i++) { +reserved.add(new Uuid(0L, i)); +} +RESERVED = Collections.unmodifiableSet(reserved); +} + +/** + * Static factory to generate a directory ID. + * + * This will not generate a reserved UUID (first 100), or one whose string representation starts with a dash ("-") + */ +public static Uuid random() { +Uuid uuid = Uuid.randomUuid(); +while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) { Review Comment: We don't need `uuid.toString().startsWith("-")` here, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]
CalvinConfluent commented on PR #14603: URL: https://github.com/apache/kafka/pull/14603#issuecomment-1783656823 Found wield UT failure, rebase trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375136169 ## build.gradle: ## @@ -1361,6 +1375,21 @@ project(':clients') { generator project(':generator') } + shadowJar { +archiveClassifier = null Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375136109 ## build.gradle: ## @@ -1342,6 +1348,14 @@ project(':clients') { implementation libs.lz4 implementation libs.snappy implementation libs.slf4jApi +implementation libs.opentelemetryProto + +// declare runtime libraries Review Comment: Done. ## build.gradle: ## @@ -1361,6 +1375,21 @@ project(':clients') { generator project(':generator') } + shadowJar { +archiveClassifier = null +// KIP-714: move shaded dependencies to a shaded location +relocate('io.opentelemetry', 'org.apache.kafka.shaded.io.opentelemetry') Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
apoorvmittal10 commented on PR #14618: URL: https://github.com/apache/kafka/pull/14618#issuecomment-1783642480 > @apoorvmittal10 could we show a diff before/after this change of: > > * the pom > * the content of the jar – excluding `org/apache/kafka/shaded/com/google/protobuf/` and `org/apache/kafka/shaded/io/opentelemetry/proto/` > > to make sure the only difference in the resulting artifact are those shaded classes? @xvrl Please find the details below, AK `trunk` branch build vs `changes` in PR. Do you think we should relocate `*.proto` files too, didn't do that as I felt moving classes should be sufficient? ``` ➜ 3.7.0-SNAPSHOT-trunk> jar -tf kafka-clients-3.7.0-SNAPSHOT.jar > jar_tf_output ➜ 3.7.0-SNAPSHOT-changes> jar -tf kafka-clients-3.7.0-SNAPSHOT.jar | grep -v org/apache/kafka/shaded/ > jar_tf_output ``` ``` git diff 3.7.0-SNAPSHOT-trunk/jar_tf_output 3.7.0-SNAPSHOT-changes/jar_tf_output ``` ![Screenshot 2023-10-28 at 1 40 09 AM](https://github.com/apache/kafka/assets/2861565/b0638949-fae8-4568-a901-d736767e46f3) ``` git diff 3.7.0-SNAPSHOT-trunk/pom.xml 3.7.0-SNAPSHOT-changes/pom.xml ``` ![Screenshot 2023-10-28 at 1 42 06 AM](https://github.com/apache/kafka/assets/2861565/a981319b-fccc-4f5e-9305-92e6b4908446) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375136205 ## build.gradle: ## @@ -1380,7 +1409,9 @@ project(':clients') { } jar { +enabled false dependsOn createVersionFile +dependsOn 'shadowJar' from("$buildDir") { include "kafka/$buildVersionFileName" } Review Comment: Thanks for pointing that, added other files too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780569#comment-17780569 ] ASF GitHub Bot commented on KAFKA-15653: ijuma commented on code in PR #564: URL: https://github.com/apache/kafka-site/pull/564#discussion_r1375131837 ## 36/upgrade.html: ## @@ -116,6 +116,12 @@ Notable changes in 3 For more information about the early access tiered storage feature, please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage;>KIP-405 and https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes;>Tiered Storage Early Access Release Note. +Transaction partition verification (https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense;>KIP-890) +has been added to data partitions to prevent hanging transactions. Workloads with compression can experience InvalidRecordExceptions and UnknownServerExceptions. +For workloads with compression, transaction.partition.verification.enable should be set to false. Note that the default for 3.6 is true. Review Comment: I'd just say "This feature can be disabled by setting..." (instead of "For workloads with compression...". ## 36/upgrade.html: ## @@ -116,6 +116,12 @@ Notable changes in 3 For more information about the early access tiered storage feature, please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage;>KIP-405 and https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes;>Tiered Storage Early Access Release Note. +Transaction partition verification (https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense;>KIP-890) +has been added to data partitions to prevent hanging transactions. Workloads with compression can experience InvalidRecordExceptions and UnknownServerExceptions. +For workloads with compression, transaction.partition.verification.enable should be set to false. Note that the default for 3.6 is true. +The configuration can also be updated dynamically and is applied to the broker. +This will be fixed in a future release. See https://issues.apache.org/jira/browse/KAFKA-15653;>KAFKA-15653 for more details. Review Comment: Can we say it will be fixed in 3.6.1?. > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Justine Olshan >Priority: Major > Attachments: repro.sh > > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at >
[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780567#comment-17780567 ] ASF GitHub Bot commented on KAFKA-15653: jolshan opened a new pull request, #564: URL: https://github.com/apache/kafka-site/pull/564 KAFKA-15653 can be painful for folks with compression. Adding a note about the issue and how to mitigate it. > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Justine Olshan >Priority: Major > Attachments: repro.sh > > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
ijuma commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375128035 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java: ## @@ -55,20 +63,111 @@ class TxnPartitionEntry { .thenComparingInt(ProducerBatch::producerEpoch) .thenComparingInt(ProducerBatch::baseSequence); -TxnPartitionEntry() { +TxnPartitionEntry(TopicPartition topicPartition) { +this.topicPartition = topicPartition; this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.nextSequence = 0; -this.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER; +this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; this.lastAckedOffset = ProduceResponse.INVALID_OFFSET; this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR); } -void resetSequenceNumbers(Consumer resetSequence) { +ProducerIdAndEpoch producerIdAndEpoch() { +return producerIdAndEpoch; +} + +int nextSequence() { +return nextSequence; +} + +OptionalLong lastAckedOffset() { +if (lastAckedOffset != ProduceResponse.INVALID_OFFSET) +return OptionalLong.of(lastAckedOffset); +return OptionalLong.empty(); +} + +OptionalInt lastAckedSequence() { +if (lastAckedSequence != TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) +return OptionalInt.of(lastAckedSequence); +return OptionalInt.empty(); +} + +boolean hasInflightBatches() { +return !inflightBatchesBySequence.isEmpty(); +} + +ProducerBatch nextBatchBySequence() { +return inflightBatchesBySequence.isEmpty() ? null : inflightBatchesBySequence.first(); +} + +void incrementSequence(int increment) { +this.nextSequence = DefaultRecordBatch.incrementSequence(this.nextSequence, increment); +} + +void addInflightBatch(ProducerBatch batch) { +inflightBatchesBySequence.add(batch); +} + +void setLastAckedOffset(long lastAckedOffset) { +this.lastAckedOffset = lastAckedOffset; +} + +void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) { +final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0); +resetSequenceNumbers(inFlightBatch -> { +inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value); +sequence.value += inFlightBatch.recordCount; +}); +producerIdAndEpoch = newProducerIdAndEpoch; +nextSequence = sequence.value; +lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; +} + +void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) { +decrementSequence(recordCount); +resetSequenceNumbers(inFlightBatch -> { +if (inFlightBatch.baseSequence() < baseSequence) +return; + +int newSequence = inFlightBatch.baseSequence() - recordCount; +if (newSequence < 0) +throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() ++ " for partition " + topicPartition + " is going to become negative: " + newSequence); + +inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence); +}); +} + +int maybeUpdateLastAckedSequence(int sequence) { Review Comment: I tried to have non private methods first. No particular reason for these two, I moved them above `adjustSequencesDueToFailedBatch`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
ijuma commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375127610 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3449,7 +3449,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t transactionManager.handleCompletedBatch(tp1b3, t1b3Response); assertFalse(transactionManager.hasInflightBatches(tp1)); -assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); +assertEquals(1, transactionManager.sequenceNumber(tp1)); +assertEquals(1, transactionManager.sequenceNumber(tp1)); Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
ijuma commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375127506 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -778,22 +778,22 @@ public void testIdempotenceWithMultipleInflights() throws Exception { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); -assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); +assertEquals(0, transactionManager.sequenceNumber(tp0)); Review Comment: I can't think of a good reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
jolshan commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375122091 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java: ## @@ -778,22 +778,22 @@ public void testIdempotenceWithMultipleInflights() throws Exception { prepareAndReceiveInitProducerId(producerId, Errors.NONE); assertTrue(transactionManager.hasProducerId()); -assertEquals(0, transactionManager.sequenceNumber(tp0).longValue()); +assertEquals(0, transactionManager.sequenceNumber(tp0)); Review Comment: why were these longs in the first place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
jolshan commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375121838 ## clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java: ## @@ -3449,7 +3449,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new SenderMetricsRegistry(new Metrics(t transactionManager.handleCompletedBatch(tp1b3, t1b3Response); assertFalse(transactionManager.hasInflightBatches(tp1)); -assertEquals(1, transactionManager.sequenceNumber(tp1).intValue()); +assertEquals(1, transactionManager.sequenceNumber(tp1)); +assertEquals(1, transactionManager.sequenceNumber(tp1)); Review Comment: nit: duplicated line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] refactor: introduce internal StoreFactory [kafka]
agavra opened a new pull request, #14659: URL: https://github.com/apache/kafka/pull/14659 ### Overview This PR sets up the necessary prerequisites to respect configurations such as `dsl.default.store.type` and the `dsl.store.suppliers.class` (introduced in #14648) without requiring them to be passed in to `StreamBuilder#new(TopologyConfig)` (passing them only into `new KafkaStreams(...)`. It essentially makes `StoreBuilder` an external-only API and internally it uses the `StoreFactory` equivalent. In a future PR, we will replace `KeyValueStoreMaterializer` with an implementation of `StoreFactory` that creates the store builder only after `configure()` is called. ### Testing There is no change in functionality for this PR ### Review Guide 1. Start with looking at `StoreFactory` and read the JavaDocs, this is an interface representing what used to be `InternalTopologyBuilder.StateStoreFactory` 2. Look at how `StoreBuilderWrapper` is what used to be the implementation of `InternalTopologyBuilder.StateStoreFactory` 3. Note that `InternalTopologyBuilder#addStateStore` now takes in a `StoreFactory` instead of a `StoreBuilder` from everywhere that uses the DSL. 4. The rest is piping that change around and wrapping `StoreBuilder` with `StoreBuilderWrapper`. In a future PR all `StoreBuilderWrappers` in the DSL will be replaced with a new one that respects the configurations passed in to `new KafkaStreams` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
jolshan commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375118370 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java: ## @@ -55,20 +63,111 @@ class TxnPartitionEntry { .thenComparingInt(ProducerBatch::producerEpoch) .thenComparingInt(ProducerBatch::baseSequence); -TxnPartitionEntry() { +TxnPartitionEntry(TopicPartition topicPartition) { +this.topicPartition = topicPartition; this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.nextSequence = 0; -this.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER; +this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; this.lastAckedOffset = ProduceResponse.INVALID_OFFSET; this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR); } -void resetSequenceNumbers(Consumer resetSequence) { +ProducerIdAndEpoch producerIdAndEpoch() { +return producerIdAndEpoch; +} + +int nextSequence() { +return nextSequence; +} + +OptionalLong lastAckedOffset() { +if (lastAckedOffset != ProduceResponse.INVALID_OFFSET) +return OptionalLong.of(lastAckedOffset); +return OptionalLong.empty(); +} + +OptionalInt lastAckedSequence() { +if (lastAckedSequence != TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) +return OptionalInt.of(lastAckedSequence); +return OptionalInt.empty(); +} + +boolean hasInflightBatches() { +return !inflightBatchesBySequence.isEmpty(); +} + +ProducerBatch nextBatchBySequence() { +return inflightBatchesBySequence.isEmpty() ? null : inflightBatchesBySequence.first(); +} + +void incrementSequence(int increment) { +this.nextSequence = DefaultRecordBatch.incrementSequence(this.nextSequence, increment); +} + +void addInflightBatch(ProducerBatch batch) { +inflightBatchesBySequence.add(batch); +} + +void setLastAckedOffset(long lastAckedOffset) { +this.lastAckedOffset = lastAckedOffset; +} + +void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) { +final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0); +resetSequenceNumbers(inFlightBatch -> { +inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value); +sequence.value += inFlightBatch.recordCount; +}); +producerIdAndEpoch = newProducerIdAndEpoch; +nextSequence = sequence.value; +lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; +} + +void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) { +decrementSequence(recordCount); +resetSequenceNumbers(inFlightBatch -> { +if (inFlightBatch.baseSequence() < baseSequence) +return; + +int newSequence = inFlightBatch.baseSequence() - recordCount; +if (newSequence < 0) +throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() ++ " for partition " + topicPartition + " is going to become negative: " + newSequence); + +inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence); +}); +} + +int maybeUpdateLastAckedSequence(int sequence) { Review Comment: nit: for these two methods: in TransactionManager, they were listed before adjustSequencesDueToFailedBatch. Not a big deal, but was curious if we wanted the ordering to be consistent. (Looks like we are pretty consistent with the TxnPartitionMap ordering) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]
jolshan commented on code in PR #14591: URL: https://github.com/apache/kafka/pull/14591#discussion_r1375118370 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java: ## @@ -55,20 +63,111 @@ class TxnPartitionEntry { .thenComparingInt(ProducerBatch::producerEpoch) .thenComparingInt(ProducerBatch::baseSequence); -TxnPartitionEntry() { +TxnPartitionEntry(TopicPartition topicPartition) { +this.topicPartition = topicPartition; this.producerIdAndEpoch = ProducerIdAndEpoch.NONE; this.nextSequence = 0; -this.lastAckedSequence = TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER; +this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; this.lastAckedOffset = ProduceResponse.INVALID_OFFSET; this.inflightBatchesBySequence = new TreeSet<>(PRODUCER_BATCH_COMPARATOR); } -void resetSequenceNumbers(Consumer resetSequence) { +ProducerIdAndEpoch producerIdAndEpoch() { +return producerIdAndEpoch; +} + +int nextSequence() { +return nextSequence; +} + +OptionalLong lastAckedOffset() { +if (lastAckedOffset != ProduceResponse.INVALID_OFFSET) +return OptionalLong.of(lastAckedOffset); +return OptionalLong.empty(); +} + +OptionalInt lastAckedSequence() { +if (lastAckedSequence != TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER) +return OptionalInt.of(lastAckedSequence); +return OptionalInt.empty(); +} + +boolean hasInflightBatches() { +return !inflightBatchesBySequence.isEmpty(); +} + +ProducerBatch nextBatchBySequence() { +return inflightBatchesBySequence.isEmpty() ? null : inflightBatchesBySequence.first(); +} + +void incrementSequence(int increment) { +this.nextSequence = DefaultRecordBatch.incrementSequence(this.nextSequence, increment); +} + +void addInflightBatch(ProducerBatch batch) { +inflightBatchesBySequence.add(batch); +} + +void setLastAckedOffset(long lastAckedOffset) { +this.lastAckedOffset = lastAckedOffset; +} + +void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) { +final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0); +resetSequenceNumbers(inFlightBatch -> { +inFlightBatch.resetProducerState(newProducerIdAndEpoch, sequence.value); +sequence.value += inFlightBatch.recordCount; +}); +producerIdAndEpoch = newProducerIdAndEpoch; +nextSequence = sequence.value; +lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER; +} + +void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) { +decrementSequence(recordCount); +resetSequenceNumbers(inFlightBatch -> { +if (inFlightBatch.baseSequence() < baseSequence) +return; + +int newSequence = inFlightBatch.baseSequence() - recordCount; +if (newSequence < 0) +throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() ++ " for partition " + topicPartition + " is going to become negative: " + newSequence); + +inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence); +}); +} + +int maybeUpdateLastAckedSequence(int sequence) { Review Comment: nit: for these two methods: in TransactionManager, they were listed before adjustSequencesDueToFailedBatch. Not a big deal, but was curious if we wanted the ordering to be consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]
mumrah commented on code in PR #14654: URL: https://github.com/apache/kafka/pull/14654#discussion_r1375115830 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -66,7 +66,12 @@ object ZkMigrationIntegrationTest { } def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = { -Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, MetadataVersion.IBP_3_6_IV2).foreach { mv => +Seq( + MetadataVersion.IBP_3_4_IV0, + MetadataVersion.IBP_3_5_IV2, + MetadataVersion.IBP_3_6_IV2, + MetadataVersion.latest() Review Comment: No, but we should add it so it doesn't get skipped over in the future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100926 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class MetricNamingConventionTest { Review Comment: Added, thanks for the suggestion . ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class MetricNamingConventionTest { Review Comment: Added, thanks for the suggestion . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100801 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { Review Comment: I have changed name to `TelemtryMetricNamingConvention` as we generally referred by same in KIP. Let me know if it works or we still feel that the name should be more specific as you suggested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375099944 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { +@Override +public MetricKey metricKey(MetricName metricName) { +Objects.requireNonNull(metricName, "metric name cannot be null"); +String group = metricName.group() == null ? "" : metricName.group(); +String rawName = metricName.name() == null ? "" : metricName.name(); + +return new MetricKey(fullMetricName(domain, group, rawName), +Collections.unmodifiableMap(cleanTags(metricName.tags(; +} + +@Override +public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) { +Objects.requireNonNull(derivedComponent, "derived component cannot be null"); +return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags()); +} +}; +} + +/** + * Creates a metric name given the domain, group, and name. The new String follows the following + * conventions and rules: + * + * + * domain is expected to be a host-name like value, e.g. {@code org.apache.kafka} + * group is cleaned of redundant words: "-metrics" + * the group and metric name is dot separated + * The name is created by joining the three components, e.g.: + * {@code org.apache.kafka.producer.connection.creation.rate} + * + */ +private static String fullMetricName(String domain, String group, String name) { +return domain ++ NAME_JOINER ++ cleanGroup(group) ++ NAME_JOINER ++ cleanMetric(name); +} + +/** + * This method maps a raw name to follow conventions and cleans up the result to be more legible: + * + * converts names to lower hyphen case conventions Review Comment: Corrected the comment, my bad. ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100330 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { Review Comment: Renamed `domain` -> `prefix`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100148 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { Review Comment: Done, added test case as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100061 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { +@Override +public MetricKey metricKey(MetricName metricName) { +Objects.requireNonNull(metricName, "metric name cannot be null"); +String group = metricName.group() == null ? "" : metricName.group(); Review Comment: Removed, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375099869 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { +@Override +public MetricKey metricKey(MetricName metricName) { +Objects.requireNonNull(metricName, "metric name cannot be null"); +String group = metricName.group() == null ? "" : metricName.group(); +String rawName = metricName.name() == null ? "" : metricName.name(); + +return new MetricKey(fullMetricName(domain, group, rawName), +Collections.unmodifiableMap(cleanTags(metricName.tags(; +} + +@Override +public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) { +Objects.requireNonNull(derivedComponent, "derived component cannot be null"); +return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags()); +} +}; +} + +/** + * Creates a metric name given the domain, group, and name. The new String follows the following + * conventions and rules: + * + * + * domain is expected to be a host-name like value, e.g. {@code org.apache.kafka} + * group is cleaned of redundant words: "-metrics" + * the group and metric name is dot separated + * The name is created by joining the three components, e.g.: + * {@code org.apache.kafka.producer.connection.creation.rate} + * + */ +private static String fullMetricName(String domain, String group, String name) { +return domain ++ NAME_JOINER ++ cleanGroup(group) ++ NAME_JOINER ++ cleanMetric(name); +} + +/** + * This method maps a raw name to follow conventions and cleans up the result to be more legible: + * + * converts names to lower hyphen case conventions + * strips redundant parts of the metric name, such as -metrics + * normalizes artifacts of hyphen case to dot separated conversion + * + */ +private static String cleanGroup(String group) { +group = clean(group, NAME_JOINER); +return GROUP_PATTERN.matcher(group).replaceAll(""); +} + +private static String cleanMetric(String metric) { +return clean(metric, NAME_JOINER); +} + +/** + * Converts a tag name to match the telemetry naming conventions by converting snake_case. Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375099765 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { +@Override +public MetricKey metricKey(MetricName metricName) { +Objects.requireNonNull(metricName, "metric name cannot be null"); +String group = metricName.group() == null ? "" : metricName.group(); +String rawName = metricName.name() == null ? "" : metricName.name(); + +return new MetricKey(fullMetricName(domain, group, rawName), +Collections.unmodifiableMap(cleanTags(metricName.tags(; +} + +@Override +public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) { +Objects.requireNonNull(derivedComponent, "derived component cannot be null"); +return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags()); +} +}; +} + +/** + * Creates a metric name given the domain, group, and name. The new String follows the following + * conventions and rules: + * + * + * domain is expected to be a host-name like value, e.g. {@code org.apache.kafka} + * group is cleaned of redundant words: "-metrics" + * the group and metric name is dot separated + * The name is created by joining the three components, e.g.: + * {@code org.apache.kafka.producer.connection.creation.rate} + * + */ +private static String fullMetricName(String domain, String group, String name) { +return domain ++ NAME_JOINER ++ cleanGroup(group) ++ NAME_JOINER ++ cleanMetric(name); +} + +/** + * This method maps a raw name to follow conventions and cleans up the result to be more legible: + * + * converts names to lower hyphen case conventions + * strips redundant parts of the metric name, such as -metrics + * normalizes artifacts of hyphen case to dot separated conversion + * + */ +private static String cleanGroup(String group) { +group = clean(group, NAME_JOINER); +return GROUP_PATTERN.matcher(group).replaceAll(""); +} + +private static String cleanMetric(String metric) { +return clean(metric, NAME_JOINER); +} + +/** + * Converts a tag name to match the telemetry naming conventions by converting snake_case. + * + * Kafka metrics have tags name in lower case separated by hyphens. Eg: total-errors + * + * @param raw the input map + * @return the new map with keys replaced by snake_case representations. + */ +private static Map cleanTags(Map raw) { +return raw.entrySet() +.stream() +.collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), Entry::getValue)); +} + +private static String clean(String raw, String joiner) { +Objects.requireNonNull(raw,
[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role
[ https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780554#comment-17780554 ] Jakub Scholz commented on KAFKA-14941: -- The idea with which I opened it was to: * Have it documented in the first place * If it is done in the Java code in the same way as for example the current flag indicating whether an option is read-only or not, it would be a plus as that can be used in other applications as well. > Document which configuration options are applicable only to processes with > broker role or controller role > - > > Key: KAFKA-14941 > URL: https://issues.apache.org/jira/browse/KAFKA-14941 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Priority: Major > > When running in KRaft mode, some of the configuration options are applicable > only to nodes with the broker process role and some are applicable only to > the nodes with the controller process roles. It would be great if this > information was part of the documentation (e.g. in the [Broker > Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the > website), but if it was also part of the config classes so that it can be > used in situations when the configuration is dynamically configured to for > example filter the options applicable to different nodes. This would allow > having configuration files with only the actually used configuration options > and for example, help to reduce unnecessary restarts when rolling out new > configurations etc. > For some options, it seems clear and the Kafka node would refuse to start if > they are set - for example the configurations of the non-controler-listeners > in controller-only nodes. For others, it seems a bit less clear (Does > {{compression.type}} option apply to controller-only nodes? Or the > configurations for the offset topic? etc.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools
[ https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780551#comment-17780551 ] Colin McCabe commented on KAFKA-14349: -- This was fixed as part of KAFKA-14351, but we forgot to close the JIRA. Closing now. > Support dynamically resizing the KRaft controller's thread pools > > > Key: KAFKA-14349 > URL: https://issues.apache.org/jira/browse/KAFKA-14349 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Priority: Major > Labels: 4.0-blocker, kip-500 > > Support dynamically resizing the KRaft controller's request handler and > network handler thread pools. See {{DynamicBrokerConfig.scala}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14369) Docs - KRAFT controller authentication example
[ https://issues.apache.org/jira/browse/KAFKA-14369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780549#comment-17780549 ] Colin McCabe commented on KAFKA-14369: -- Thanks [~dbove]. I agree that it would be helpful to have an example config file with non-PLAINTEXT auth. If you have one, please post it here. > Docs - KRAFT controller authentication example > -- > > Key: KAFKA-14369 > URL: https://issues.apache.org/jira/browse/KAFKA-14369 > Project: Kafka > Issue Type: Bug > Components: docs >Affects Versions: 3.3.1 >Reporter: Domenic Bove >Priority: Minor > Labels: kraft > > The [Kafka Listener docs > |https://kafka.apache.org/documentation/#listener_configuration]mention how > to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller > listener, but it is not a working example and I found that I was missing this > property: > {code:java} > sasl.mechanism.controller.protocol {code} > when attempting to do SASL_PLAINTEXT on the controller listener. I see that > property here: > [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol] > But nowhere else. > I wonder if a complete working example would be better. Here are my working > configs for sasl plain on the controller > {code:java} > process.roles=controller > listeners=CONTROLLER://:9093 > node.id=1 > controller.quorum.voters=1@localhost:9093 > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT > listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule > required username="admin" password="admin-secret" user_admin="admin-secret" > user_alice="alice-secret"; > listener.name.controller.sasl.enabled.mechanisms=PLAIN > listener.name.controller.sasl.mechanism=PLAIN > sasl.enabled.mechanisms=PLAIN > sasl.mechanism.controller.protocol=PLAIN{code} > Or maybe just a callout of that property in the existing docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14369) Docs - KRAFT controller authentication example
[ https://issues.apache.org/jira/browse/KAFKA-14369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14369: - Labels: kraft (was: 4.0-blocker) > Docs - KRAFT controller authentication example > -- > > Key: KAFKA-14369 > URL: https://issues.apache.org/jira/browse/KAFKA-14369 > Project: Kafka > Issue Type: Bug > Components: docs >Affects Versions: 3.3.1 >Reporter: Domenic Bove >Priority: Minor > Labels: kraft > > The [Kafka Listener docs > |https://kafka.apache.org/documentation/#listener_configuration]mention how > to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller > listener, but it is not a working example and I found that I was missing this > property: > {code:java} > sasl.mechanism.controller.protocol {code} > when attempting to do SASL_PLAINTEXT on the controller listener. I see that > property here: > [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol] > But nowhere else. > I wonder if a complete working example would be better. Here are my working > configs for sasl plain on the controller > {code:java} > process.roles=controller > listeners=CONTROLLER://:9093 > node.id=1 > controller.quorum.voters=1@localhost:9093 > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT > listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule > required username="admin" password="admin-secret" user_admin="admin-secret" > user_alice="alice-secret"; > listener.name.controller.sasl.enabled.mechanisms=PLAIN > listener.name.controller.sasl.mechanism=PLAIN > sasl.enabled.mechanisms=PLAIN > sasl.mechanism.controller.protocol=PLAIN{code} > Or maybe just a callout of that property in the existing docs -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names
[ https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14927: - Labels: (was: 4.0-blocker) > Prevent kafka-configs.sh from setting non-alphanumeric config key names > --- > > Key: KAFKA-14927 > URL: https://issues.apache.org/jira/browse/KAFKA-14927 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.3.2 >Reporter: Justin Daines >Assignee: Aman Singh >Priority: Minor > Fix For: 3.7.0 > > > Using {{kafka-configs}} should validate dynamic configurations before > applying. It is possible to send a file with invalid configurations. > For example a file containing the following: > {code:java} > { > "routes": { > "crn:///kafka=*": { > "management": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied" > }, > "describe": { > "allowed": "", > "denied": "confluent-audit-log-events-denied" > }, > "authentication": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authn" > }, > "authorize": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authz" > }, > "interbroker": { > "allowed": "", > "denied": "" > } > }, > "crn:///kafka=*/group=*": { > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > }, > "crn:///kafka=*/topic=*": { > "produce": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > } > }, > "destinations": { > "topics": { > "confluent-audit-log-events": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authn": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authz": { > "retention_ms": 777600 > }, > "confluent-audit-log-events_audit": { > "retention_ms": 777600 > } > } > }, > "default_topics": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "excluded_principals": [ > "User:schemaregistryUser", > "User:ANONYMOUS", > "User:appSA", > "User:admin", > "User:connectAdmin", > "User:connectorSubmitter", > "User:connectorSA", > "User:schemaregistryUser", > "User:ksqlDBAdmin", > "User:ksqlDBUser", > "User:controlCenterAndKsqlDBServer", > "User:controlcenterAdmin", > "User:restAdmin", > "User:appSA", > "User:clientListen", > "User:superUser" > ] > } {code} > {code:java} > kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers > --entity-default --alter --add-config-file audit-log.json {code} > Yields the following dynamic configs: > {code:java} > Default configs for brokers in the cluster are: > "destinations"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null} > "confluent-audit-log-events-denied-authn"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null} > "routes"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null} > "User=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null} > },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null} > "excluded_principals"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null} > "confluent-audit-log-events_audit"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null} > "authorize"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null} > "default_topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null} > "topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null} > ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null} > "interbroker"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null} > "produce"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null} > "denied"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"denied"=null} >
[jira] [Commented] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names
[ https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780541#comment-17780541 ] Colin McCabe commented on KAFKA-14927: -- It looks like this change was committed. I will close the JIRA then. > Prevent kafka-configs.sh from setting non-alphanumeric config key names > --- > > Key: KAFKA-14927 > URL: https://issues.apache.org/jira/browse/KAFKA-14927 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.3.2 >Reporter: Justin Daines >Assignee: Aman Singh >Priority: Minor > Labels: 4.0-blocker > Fix For: 3.7.0 > > > Using {{kafka-configs}} should validate dynamic configurations before > applying. It is possible to send a file with invalid configurations. > For example a file containing the following: > {code:java} > { > "routes": { > "crn:///kafka=*": { > "management": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied" > }, > "describe": { > "allowed": "", > "denied": "confluent-audit-log-events-denied" > }, > "authentication": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authn" > }, > "authorize": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authz" > }, > "interbroker": { > "allowed": "", > "denied": "" > } > }, > "crn:///kafka=*/group=*": { > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > }, > "crn:///kafka=*/topic=*": { > "produce": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > } > }, > "destinations": { > "topics": { > "confluent-audit-log-events": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authn": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authz": { > "retention_ms": 777600 > }, > "confluent-audit-log-events_audit": { > "retention_ms": 777600 > } > } > }, > "default_topics": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "excluded_principals": [ > "User:schemaregistryUser", > "User:ANONYMOUS", > "User:appSA", > "User:admin", > "User:connectAdmin", > "User:connectorSubmitter", > "User:connectorSA", > "User:schemaregistryUser", > "User:ksqlDBAdmin", > "User:ksqlDBUser", > "User:controlCenterAndKsqlDBServer", > "User:controlcenterAdmin", > "User:restAdmin", > "User:appSA", > "User:clientListen", > "User:superUser" > ] > } {code} > {code:java} > kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers > --entity-default --alter --add-config-file audit-log.json {code} > Yields the following dynamic configs: > {code:java} > Default configs for brokers in the cluster are: > "destinations"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null} > "confluent-audit-log-events-denied-authn"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null} > "routes"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null} > "User=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null} > },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null} > "excluded_principals"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null} > "confluent-audit-log-events_audit"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null} > "authorize"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null} > "default_topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null} > "topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null} > ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null} > "interbroker"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null} > "produce"=null sensitive=true >
[jira] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names
[ https://issues.apache.org/jira/browse/KAFKA-14927 ] Colin McCabe deleted comment on KAFKA-14927: -- was (Author: cmccabe): It looks like this change was committed. I will close the JIRA then. > Prevent kafka-configs.sh from setting non-alphanumeric config key names > --- > > Key: KAFKA-14927 > URL: https://issues.apache.org/jira/browse/KAFKA-14927 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.3.2 >Reporter: Justin Daines >Assignee: Aman Singh >Priority: Minor > Labels: 4.0-blocker > Fix For: 3.7.0 > > > Using {{kafka-configs}} should validate dynamic configurations before > applying. It is possible to send a file with invalid configurations. > For example a file containing the following: > {code:java} > { > "routes": { > "crn:///kafka=*": { > "management": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied" > }, > "describe": { > "allowed": "", > "denied": "confluent-audit-log-events-denied" > }, > "authentication": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authn" > }, > "authorize": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authz" > }, > "interbroker": { > "allowed": "", > "denied": "" > } > }, > "crn:///kafka=*/group=*": { > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > }, > "crn:///kafka=*/topic=*": { > "produce": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > } > }, > "destinations": { > "topics": { > "confluent-audit-log-events": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authn": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authz": { > "retention_ms": 777600 > }, > "confluent-audit-log-events_audit": { > "retention_ms": 777600 > } > } > }, > "default_topics": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "excluded_principals": [ > "User:schemaregistryUser", > "User:ANONYMOUS", > "User:appSA", > "User:admin", > "User:connectAdmin", > "User:connectorSubmitter", > "User:connectorSA", > "User:schemaregistryUser", > "User:ksqlDBAdmin", > "User:ksqlDBUser", > "User:controlCenterAndKsqlDBServer", > "User:controlcenterAdmin", > "User:restAdmin", > "User:appSA", > "User:clientListen", > "User:superUser" > ] > } {code} > {code:java} > kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers > --entity-default --alter --add-config-file audit-log.json {code} > Yields the following dynamic configs: > {code:java} > Default configs for brokers in the cluster are: > "destinations"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null} > "confluent-audit-log-events-denied-authn"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null} > "routes"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null} > "User=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null} > },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null} > "excluded_principals"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null} > "confluent-audit-log-events_audit"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null} > "authorize"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null} > "default_topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null} > "topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null} > ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null} > "interbroker"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null} > "produce"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null} > "denied"=null sensitive=true >
[jira] [Updated] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names
[ https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14927: - Summary: Prevent kafka-configs.sh from setting non-alphanumeric config key names (was: Dynamic configs not validated when using kafka-configs and --add-config-file) > Prevent kafka-configs.sh from setting non-alphanumeric config key names > --- > > Key: KAFKA-14927 > URL: https://issues.apache.org/jira/browse/KAFKA-14927 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.3.2 >Reporter: Justin Daines >Assignee: Aman Singh >Priority: Minor > Labels: 4.0-blocker > Fix For: 3.7.0 > > > Using {{kafka-configs}} should validate dynamic configurations before > applying. It is possible to send a file with invalid configurations. > For example a file containing the following: > {code:java} > { > "routes": { > "crn:///kafka=*": { > "management": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied" > }, > "describe": { > "allowed": "", > "denied": "confluent-audit-log-events-denied" > }, > "authentication": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authn" > }, > "authorize": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authz" > }, > "interbroker": { > "allowed": "", > "denied": "" > } > }, > "crn:///kafka=*/group=*": { > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > }, > "crn:///kafka=*/topic=*": { > "produce": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > } > }, > "destinations": { > "topics": { > "confluent-audit-log-events": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authn": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authz": { > "retention_ms": 777600 > }, > "confluent-audit-log-events_audit": { > "retention_ms": 777600 > } > } > }, > "default_topics": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "excluded_principals": [ > "User:schemaregistryUser", > "User:ANONYMOUS", > "User:appSA", > "User:admin", > "User:connectAdmin", > "User:connectorSubmitter", > "User:connectorSA", > "User:schemaregistryUser", > "User:ksqlDBAdmin", > "User:ksqlDBUser", > "User:controlCenterAndKsqlDBServer", > "User:controlcenterAdmin", > "User:restAdmin", > "User:appSA", > "User:clientListen", > "User:superUser" > ] > } {code} > {code:java} > kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers > --entity-default --alter --add-config-file audit-log.json {code} > Yields the following dynamic configs: > {code:java} > Default configs for brokers in the cluster are: > "destinations"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null} > "confluent-audit-log-events-denied-authn"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null} > "routes"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null} > "User=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null} > },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null} > "excluded_principals"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null} > "confluent-audit-log-events_audit"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null} > "authorize"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null} > "default_topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null} > "topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null} > ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null} > "interbroker"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null} > "produce"=null
[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role
[ https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780540#comment-17780540 ] Colin McCabe commented on KAFKA-14941: -- I'm not sure that I totally understand the goal here. If the goal is to be able to dynamically change configurations, that does not require leaving the configuration out of the static broker or controller config file. The dynamic configuration always takes precedence. If the goal is to understand what the configuration does, the help text of the configuration should explain that. Can you explain a bit more about the goal? > Document which configuration options are applicable only to processes with > broker role or controller role > - > > Key: KAFKA-14941 > URL: https://issues.apache.org/jira/browse/KAFKA-14941 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Priority: Major > > When running in KRaft mode, some of the configuration options are applicable > only to nodes with the broker process role and some are applicable only to > the nodes with the controller process roles. It would be great if this > information was part of the documentation (e.g. in the [Broker > Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the > website), but if it was also part of the config classes so that it can be > used in situations when the configuration is dynamically configured to for > example filter the options applicable to different nodes. This would allow > having configuration files with only the actually used configuration options > and for example, help to reduce unnecessary restarts when rolling out new > configurations etc. > For some options, it seems clear and the Kafka node would refuse to start if > they are set - for example the configurations of the non-controler-listeners > in controller-only nodes. For others, it seems a bit less clear (Does > {{compression.type}} option apply to controller-only nodes? Or the > configurations for the offset topic? etc.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role
[ https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-14941: - Labels: (was: 4.0-blocker) > Document which configuration options are applicable only to processes with > broker role or controller role > - > > Key: KAFKA-14941 > URL: https://issues.apache.org/jira/browse/KAFKA-14941 > Project: Kafka > Issue Type: Improvement >Reporter: Jakub Scholz >Priority: Major > > When running in KRaft mode, some of the configuration options are applicable > only to nodes with the broker process role and some are applicable only to > the nodes with the controller process roles. It would be great if this > information was part of the documentation (e.g. in the [Broker > Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the > website), but if it was also part of the config classes so that it can be > used in situations when the configuration is dynamically configured to for > example filter the options applicable to different nodes. This would allow > having configuration files with only the actually used configuration options > and for example, help to reduce unnecessary restarts when rolling out new > configurations etc. > For some options, it seems clear and the Kafka node would refuse to start if > they are set - for example the configurations of the non-controler-listeners > in controller-only nodes. For others, it seems a bit less clear (Does > {{compression.type}} option apply to controller-only nodes? Or the > configurations for the offset topic? etc.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
xvrl commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375083080 ## build.gradle: ## @@ -1342,6 +1348,14 @@ project(':clients') { implementation libs.lz4 implementation libs.snappy implementation libs.slf4jApi +implementation libs.opentelemetryProto + +// declare runtime libraries Review Comment: can we add a comment to explain which dependencies should be declared here? ## build.gradle: ## @@ -1342,6 +1348,14 @@ project(':clients') { implementation libs.lz4 implementation libs.snappy implementation libs.slf4jApi +implementation libs.opentelemetryProto + +// declare runtime libraries Review Comment: can we add a comment to explain which dependencies should be declared here and why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
xvrl commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375082874 ## build.gradle: ## @@ -1361,6 +1375,21 @@ project(':clients') { generator project(':generator') } + shadowJar { +archiveClassifier = null +// KIP-714: move shaded dependencies to a shaded location +relocate('io.opentelemetry', 'org.apache.kafka.shaded.io.opentelemetry') Review Comment: can we limit the relocation to the more specific `io.opentelemetry.proto` package since there shouldn't be any other opentelemetry classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15489) split brain in KRaft cluster
[ https://issues.apache.org/jira/browse/KAFKA-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15489: - Labels: (was: 4.0-blocker) > split brain in KRaft cluster > - > > Key: KAFKA-15489 > URL: https://issues.apache.org/jira/browse/KAFKA-15489 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.5.1 >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > I found in the current KRaft implementation, when network partition happened > between the current controller leader and the other controller nodes, the > "split brain" issue will happen. It causes 2 leaders will exist in the > controller cluster, and 2 inconsistent sets of metadata will return to the > clients. > > *Root cause* > In > [KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote], > we said A voter will begin a new election under three conditions: > 1. If it fails to receive a FetchResponse from the current leader before > expiration of quorum.fetch.timeout.ms > 2. If it receives a EndQuorumEpoch request from the current leader > 3. If it fails to receive a majority of votes before expiration of > quorum.election.timeout.ms after declaring itself a candidate. > And that's exactly what the current KRaft's implementation. > > However, when the leader is isolated from the network partition, there's no > way for it to resign from the leadership and start a new election. So the > leader will always be the leader even though all other nodes are down. And > this makes the split brain issue possible. > When reading further in the KIP-595, I found we indeed considered this > situation and have solution for that. in [this > section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout], > it said: > {quote}In the pull-based model, however, say a new leader has been elected > with a new epoch and everyone has learned about it except the old leader > (e.g. that leader was not in the voters anymore and hence not receiving the > BeginQuorumEpoch as well), then that old leader would not be notified by > anyone about the new leader / epoch and become a pure "zombie leader", as > there is no regular heartbeats being pushed from leader to the follower. This > could lead to stale information being served to the observers and clients > inside the cluster. > {quote} > {quote}To resolve this issue, we will piggy-back on the > "quorum.fetch.timeout.ms" config, such that if the leader did not receive > Fetch requests from a majority of the quorum for that amount of time, it > would begin a new election and start sending VoteRequest to voter nodes in > the cluster to understand the latest quorum. If it couldn't connect to any > known voter, the old leader shall keep starting new elections and bump the > epoch. > {quote} > > But we missed this implementation in current KRaft. > > *The flow is like this:* > 1. 3 controller nodes, A(leader), B(follower), C(follower) > 2. network partition happened between [A] and [B, C]. > 3. B and C starts new election since fetch timeout expired before receiving > fetch response from leader A. > 4. B (or C) is elected as a leader in new epoch, while A is still the leader > in old epoch. > 5. broker D creates a topic "new", and updates to leader B. > 6. broker E describe topic "new", but got nothing because it is connecting to > the old leader A. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-15513: - Labels: (was: 4.0-blocker) > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780539#comment-17780539 ] Colin McCabe commented on KAFKA-15513: -- To be more concrete, you need to use the {{--add-scram}} argument to the {{kafka-storage.sh format}} command. > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > Labels: 4.0-blocker > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
xvrl commented on PR #14618: URL: https://github.com/apache/kafka/pull/14618#issuecomment-1783559671 @apoorvmittal10 could we show a diff before/after this change of: - the pom - the content of the jar – excluding `org/apache/kafka/shaded/com/google/protobuf/` and `org/apache/kafka/shaded/io/opentelemetry/proto/` to make sure the only difference in the resulting artifact are those shaded classes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513 ] Colin McCabe deleted comment on KAFKA-15513: -- was (Author: cmccabe): Currently, you need to add the controller principal to `super.users` rather than relying on SCRAM to configure it. This is no different than how in ZK mode, you must have working ZK auth before you can configure Kafka. In the future, we will probably support configuring SCRAM prior to controller startup via the `kafka-format.sh` command. The mechanism is all there (in the form of the bootstrap file) but we haven't finished implementing it yet... > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > Labels: 4.0-blocker > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane
[ https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780537#comment-17780537 ] Colin McCabe commented on KAFKA-15513: -- Currently, you need to add the controller principal to `super.users` rather than relying on SCRAM to configure it. This is no different than how in ZK mode, you must have working ZK auth before you can configure Kafka. In the future, we will probably support configuring SCRAM prior to controller startup via the `kafka-format.sh` command. The mechanism is all there (in the form of the bootstrap file) but we haven't finished implementing it yet... > KRaft cluster fails with SCRAM authentication enabled for control-plane > --- > > Key: KAFKA-15513 > URL: https://issues.apache.org/jira/browse/KAFKA-15513 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.6.0, 3.5.1 >Reporter: migruiz4 >Priority: Major > Labels: 4.0-blocker > > We have observed a scenario where a KRaft cluster fails to bootstrap when > using SCRAM authentication for controller-to-controller communications. > The steps to reproduce are simple: > * Deploy (at least) 2 Kafka servers using latest version 3.5.1. > * Configure a KRaft cluster, where the controller listener uses > SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the > recommended in-line jaas config > '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}' > * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create > the SCRAM user. > When initialized, Controllers will fail to connect to each other with an > authentication error: > > {code:java} > [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: > Failed to send the following request due to authentication error: > ClientRequest(expectResponse=true, > callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075, > destination=0, correlationId=129, clientId=raft-client-1, > createdTimeMs=1690888364960, > requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', > topics=[TopicData(topicName='__cluster_metadata', > partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, > lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code} > Some additional details about the scenario that we tested out: > * Controller listener does work when configured with SASL+PLAIN > * The issue only affects the Controller listener, SCRAM users created using > the same method work for data-plane listeners and inter-broker listeners. > > Below you can find the exact configuration and command used to deploy: > * server.properties > {code:java} > listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093 > advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091 > listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT > num.network.threads=3 > num.io.threads=8 > socket.send.buffer.bytes=102400 > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > log.dirs=/bitnami/kafka/data > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=1 > transaction.state.log.replication.factor=1 > transaction.state.log.min.isr=1 > log.retention.hours=168 > log.retention.check.interval.ms=30 > controller.listener.names=CONTROLLER > controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093 > inter.broker.listener.name=INTERNAL > node.id=0 > process.roles=controller,broker > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 > sasl.mechanism.controller.protocol=SCRAM-SHA-512 > listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512 > listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule > required username="controller_user" password="controller_password";{code} > * kafka-storage.sh command > {code:java} > kafka-storage.sh format --config /path/to/server.properties > --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram > SCRAM-SHA-512=[name=controller_user,password=controller_password] {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
xvrl commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375080740 ## build.gradle: ## @@ -1361,6 +1375,21 @@ project(':clients') { generator project(':generator') } + shadowJar { +archiveClassifier = null Review Comment: can we add a comment to explain why archiveClassifier is set to null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375078778 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Yeah, the hope is that we'd see more async patterns as we evolve Kafka, and the way it's currently implemented, we just say "execute callback on the _current_ thread pool", which is why the thread local is used -- this way we can implement a non-blocking wait-and-continue-on-current-thread-pool from any level without passing additional arguments through the whole stack. The same functionality could be implemented on other thread pools, e.g. if the group coordinator has its own thread pool, we could implement the same primitive there as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]
xvrl commented on code in PR #14618: URL: https://github.com/apache/kafka/pull/14618#discussion_r1375080303 ## build.gradle: ## @@ -1380,7 +1409,9 @@ project(':clients') { } jar { +enabled false dependsOn createVersionFile +dependsOn 'shadowJar' from("$buildDir") { include "kafka/$buildVersionFileName" } Review Comment: can we make sure this build version file is still included in the final jar like before? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375061797 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: As for just produce. Yes, this is the case now, but I think Artem was trying to create the wrap method as a "general" callback mechanism. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375060752 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: We pass the wrapped method which is a simpler type. `type AppendCallback = Map[TopicPartition, Errors] => Unit` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
apoorvmittal10 commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1375060555 ## clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java: ## @@ -24,13 +24,14 @@ import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.telemetry.ClientTelemetry; /** * A plugin interface to allow things to listen as new metrics are created so they can be reported. * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Reconfigurable, AutoCloseable { +public interface MetricsReporter extends Reconfigurable, AutoCloseable, ClientTelemetry { Review Comment: @xvrl @junrao I agree as well and checked in code do not have the change in MetricsReporter.java interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15704) ControllerRegistrationRequest must set ZkMigrationReady field if appropriate
[ https://issues.apache.org/jira/browse/KAFKA-15704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-15704. -- Resolution: Fixed > ControllerRegistrationRequest must set ZkMigrationReady field if appropriate > > > Key: KAFKA-15704 > URL: https://issues.apache.org/jira/browse/KAFKA-15704 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Colin McCabe >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15704) ControllerRegistrationRequest must set ZkMigrationReady field if appropriate
[ https://issues.apache.org/jira/browse/KAFKA-15704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe reassigned KAFKA-15704: Assignee: David Arthur > ControllerRegistrationRequest must set ZkMigrationReady field if appropriate > > > Key: KAFKA-15704 > URL: https://issues.apache.org/jira/browse/KAFKA-15704 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Colin McCabe >Assignee: David Arthur >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375057023 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: > Hmmm. I'm not sure it makes sense to try to pass the request channel and current in to every method we want to do a callback for. Currently, the callback is only needed for produce request in ReplicaManager. If you look at `KafkaApis.handleProduceRequest`, we already pass in both request channel and current request to `ReplicaManager.appendRecords` through `sendResponseCallback`. > As for 2 and short circuiting... Are you suggesting I move the wrap call into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method in there as well. That may be trickier with the typing. Yes. Currently, we already pass in `appendEntries` as a callback to `AddPartitionsToTxnManager`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [KPERF-852] Rename method sendBrokerHeartbeat [kafka]
yuyli opened a new pull request, #14658: URL: https://github.com/apache/kafka/pull/14658 This is a PR to maintain the consistency between AK and CE-KAFKA Change: `sendBrokerHeartbeat ` -> `sendBrokerHeartbeatToUnfenceBrokers` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]
cmccabe commented on PR #14654: URL: https://github.com/apache/kafka/pull/14654#issuecomment-1783521988 committed, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on PR #14629: URL: https://github.com/apache/kafka/pull/14629#issuecomment-1783521458 Following up with build failure here: https://github.com/apache/kafka/pull/14545#issuecomment-1783515553 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]
cmccabe closed pull request #14654: KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest URL: https://github.com/apache/kafka/pull/14654 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
junrao commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1375047867 ## clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java: ## @@ -24,13 +24,14 @@ import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.telemetry.ClientTelemetry; /** * A plugin interface to allow things to listen as new metrics are created so they can be reported. * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Reconfigurable, AutoCloseable { +public interface MetricsReporter extends Reconfigurable, AutoCloseable, ClientTelemetry { Review Comment: I agree with Xavier. Also, the KIP didn't propose to change the `MetricsReporter` interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]
jolshan commented on PR #14545: URL: https://github.com/apache/kafka/pull/14545#issuecomment-1783515553 Hey -- I think this broke the build for java 8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375044842 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Hmmm. I'm not sure it makes sense to try to pass the request channel and current in to every method we want to do a callback for. As for 2 and short circuiting... Are you suggesting I move the wrap call into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method in there as well. That may be trickier with the typing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
junrao commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375025398 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { val requestChannel = threadRequestChannel.get() val currentRequest = threadCurrentRequest.get() if (requestChannel == null || currentRequest == null) { if (!bypassThreadCheck) throw new IllegalStateException("Attempted to reschedule to request handler thread from non-request handler thread.") - T => fun(T) + T => fun(requestLocal, T) } else { T => { -if (threadCurrentRequest.get() != null) { - // If the callback is actually executed on a request thread, we can directly execute +if (threadCurrentRequest.get() == currentRequest) { Review Comment: Ismael mentioned in https://github.com/apache/kafka/pull/9229#issuecomment-683352094 that thread locals are most useful when one doesn't control the code. So, I am wondering if we could get rid of the two ThreadLocal: `threadRequestChannel` and `threadCurrentRequest` in `KafkaRequestHandler` introduced in KAFKA-14561. The reason for the former is to obtain requestChannel. We could simply pass in requestChannel to `ReplicaManager.appendRecords`. The reason for the latter is (1) to obtain currentRequest and (2) to make sure that the callback can be short-circuited if it's called on the same request handler thread. For (1), we could also pass in currentRequest to `ReplicaManager.appendRecords`. For (2), we could change the code such that `KafkaRequestHandler.wrap` is called only when the callback truly needs to be run from a different thread. Otherwise, we can just call the callback directly there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15550. Resolution: Fixed > OffsetsForTimes validation for negative timestamps in new consumer > -- > > Key: KAFKA-15550 > URL: https://issues.apache.org/jira/browse/KAFKA-15550 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848, kip-848-preview > > OffsetsForTimes api call should fail with _IllegalArgumentException_ if > negative timestamps are provided as arguments. This will effectively exclude > earliest and latest offsets as target times, keeping the current behaviour of > the KafkaConsumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
xvrl commented on code in PR #14620: URL: https://github.com/apache/kafka/pull/14620#discussion_r1375028442 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java: ## @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.MetricValueProvider; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeCount; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Frequencies; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.metrics.stats.Percentiles; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.metrics.stats.WindowedCount; +import org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * All metrics implement the {@link MetricValueProvider} interface. They are divided into + * two base types: + * + * + * {@link Gauge} + * {@link Measurable} + * + * + * {@link Gauge Gauges} can have any value but we only collect metrics with number values. + * {@link Measurable Measurables} are divided into simple types with single values + * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link Rate}, + * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link Frequencies}, + * {@link Meter}, and {@link Percentiles}). + * + * + * + * We can safely assume that a {@link CumulativeCount count} always increases in steady state. It + * should be a bug if a count metric decreases. + * + * + * + * Total and Sum are treated as a monotonically increasing counter. The javadocs for Total metric type + * say "An un-windowed cumulative total maintained over all time.". The standalone Total metrics in + * the codebase seem to be cumulative metrics that will always increase. The Total metric underlying + * Meter type is mostly a Total of a Count metric. + * We can assume that a Total metric always increases (but it is not guaranteed as the sample values might be both + * negative or positive). + * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid counter-example. + * + * + * + * The Sum as it is a sample sum which is not a cumulative metric. It is converted to GAUGE_DOUBLE. + * + * + * + * The compound metrics are virtual metrics. They are composed of simple types or anonymous measurable types + * which are reported. A compound metric is never reported as-is. + * + * + * + * A Meter metric is always created with and reported as 2 KafkaExporter metrics: a rate and a + * count. For eg: org.apache.kafka.common.network.Selector has Meter metric for "connection-close" but it + * has to be created with a "connection-close-rate" metric of type rate and a "connection-close-total" + * metric of type total. So, we will never get a KafkaExporter metric with type Meter. + * + * + * + * Frequencies is created with a array of Frequency objects. When a Frequencies metric is registered, each + * member Frequency object is converted into an anonymous Measurable and registered. So, a Frequencies metric + * is reported with a set of measurables with name = Frequency.name(). As there is no way to figure out the + * compound type, each component measurables is converted to a GAUGE_DOUBLE. + *
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
xvrl commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375026123 ## clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class MetricNamingConventionTest { Review Comment: can we add explicit tests for the metrics we have defined in the KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
xvrl commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375025585 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { +return new MetricNamingStrategy() { +@Override +public MetricKey metricKey(MetricName metricName) { +Objects.requireNonNull(metricName, "metric name cannot be null"); +String group = metricName.group() == null ? "" : metricName.group(); +String rawName = metricName.name() == null ? "" : metricName.name(); + +return new MetricKey(fullMetricName(domain, group, rawName), +Collections.unmodifiableMap(cleanTags(metricName.tags(; +} + +@Override +public MetricKey derivedMetricKey(MetricKey key, String derivedComponent) { +Objects.requireNonNull(derivedComponent, "derived component cannot be null"); +return new MetricKey(key.getName() + NAME_JOINER + derivedComponent, key.tags()); +} +}; +} + +/** + * Creates a metric name given the domain, group, and name. The new String follows the following + * conventions and rules: + * + * + * domain is expected to be a host-name like value, e.g. {@code org.apache.kafka} + * group is cleaned of redundant words: "-metrics" + * the group and metric name is dot separated + * The name is created by joining the three components, e.g.: + * {@code org.apache.kafka.producer.connection.creation.rate} + * + */ +private static String fullMetricName(String domain, String group, String name) { +return domain ++ NAME_JOINER ++ cleanGroup(group) ++ NAME_JOINER ++ cleanMetric(name); +} + +/** + * This method maps a raw name to follow conventions and cleans up the result to be more legible: + * + * converts names to lower hyphen case conventions + * strips redundant parts of the metric name, such as -metrics + * normalizes artifacts of hyphen case to dot separated conversion + * + */ +private static String cleanGroup(String group) { +group = clean(group, NAME_JOINER); +return GROUP_PATTERN.matcher(group).replaceAll(""); +} + +private static String cleanMetric(String metric) { +return clean(metric, NAME_JOINER); +} + +/** + * Converts a tag name to match the telemetry naming conventions by converting snake_case. + * + * Kafka metrics have tags name in lower case separated by hyphens. Eg: total-errors + * + * @param raw the input map + * @return the new map with keys replaced by snake_case representations. + */ +private static Map cleanTags(Map raw) { +return raw.entrySet() +.stream() +.collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), Entry::getValue)); +} + +private static String clean(String raw, String joiner) { +Objects.requireNonNull(raw, "metric data
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
xvrl commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375023475 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { + +private static final String NAME_JOINER = "."; +private static final String TAG_JOINER = "_"; + +// remove metrics as it is redundant for telemetry metrics naming convention +private final static Pattern GROUP_PATTERN = Pattern.compile("\\.(metrics)"); + +public static MetricNamingStrategy getClientTelemetryMetricNamingStrategy(String domain) { Review Comment: Maybe we can call it `prefix` instead of domain? I believe this class should be reusable to also expose broker, streams, etc. metrics in OpenTelemetry format one day, so I would suggest to keep it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
artemlivshits commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1375016328 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: I think ideally in a future change we'd get rid of passing RequestLocal as an argument and maybe make it a static thread local that could be accessed from the point of use rather than being passed through the whole stack. There are couple problems that contributed to this issue: - the functions here are already written for asynchronous completion (because we wait for replication) and in such cases generally the convention is that the arguments of a function are not bound to the executing thread (i.e. rooted on the call stack or globally) - the point of use was outside of the core change so folks didn't look into the specifics of the RequestLocal semantics (i.e. even if appendEntries was an explicit function as it now is, I'm not sure if the problem had been noticed). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]
xvrl commented on code in PR #14619: URL: https://github.com/apache/kafka/pull/14619#discussion_r1375018518 ## clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.telemetry.internals; + +import org.apache.kafka.common.MetricName; + +import java.util.Collections; +import java.util.Locale; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * This class encapsulates naming and mapping conventions defined as part of + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics naming and format + */ +public class MetricNamingConvention { Review Comment: maybe a more explicit class name would help here. Since the goal of this mapping is to map the metric names to something that fits OpenTelemetry conventions, maybe we can call this OpenTelemetryMetricNamingConvention ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
xvrl commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1375015180 ## clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.telemetry; + +import org.apache.kafka.common.metrics.MetricsContext; +import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * A {@link MetricsReporter} may implement this interface to indicate support for collecting client + * telemetry on the server side. + */ +@InterfaceStability.Evolving +public interface ClientTelemetry { + +/** + * Implemented by the broker {@link MetricsReporter} to provide a {@link ClientTelemetryReceiver} Review Comment: maybe it would be clearer to say that the broker calls this method to get the ClientTelemetryReceiver instance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]
wolfchimneyrock commented on PR #14635: URL: https://github.com/apache/kafka/pull/14635#issuecomment-1783465122 > @wolfchimneyrock Thanks again for the PR. We verified and your suggestion makes sense. I have two asks: > > 1. Could you please file a bug and use the the jira id in the PR? > 2. Would it be possible to add a small unit test for the fix? sure, I've applied for an apache JIRA account, and I'll start on a unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]
xvrl commented on code in PR #14575: URL: https://github.com/apache/kafka/pull/14575#discussion_r1375013255 ## clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java: ## @@ -24,13 +24,14 @@ import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.telemetry.ClientTelemetry; /** * A plugin interface to allow things to listen as new metrics are created so they can be reported. * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Reconfigurable, AutoCloseable { +public interface MetricsReporter extends Reconfigurable, AutoCloseable, ClientTelemetry { Review Comment: the MetricsReporter plugin is a plugin for either server or client side. However the ClientTelemetry plugin is only relevant for broker side plugins ## clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java: ## @@ -24,13 +24,14 @@ import org.apache.kafka.common.Reconfigurable; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.telemetry.ClientTelemetry; /** * A plugin interface to allow things to listen as new metrics are created so they can be reported. * * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information. */ -public interface MetricsReporter extends Reconfigurable, AutoCloseable { +public interface MetricsReporter extends Reconfigurable, AutoCloseable, ClientTelemetry { Review Comment: the MetricsReporter plugin is a plugin for either server or client side. However the ClientTelemetry interface is only relevant for broker side plugins -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]
jolshan commented on PR #14627: URL: https://github.com/apache/kafka/pull/14627#issuecomment-1783457546 I will just watch the build now. Thanks @chb2ab -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on PR #14642: URL: https://github.com/apache/kafka/pull/14642#issuecomment-1783444304 @kirktrue @lucasbru @dajac - Thanks for taking time to review this PR. I've addressed the recent comments. Let me know if there's anything uncleared. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
philipnee commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374998305 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: sorry - I thought you meant the whole block is off. corrected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]
dajac commented on PR #14635: URL: https://github.com/apache/kafka/pull/14635#issuecomment-1783402130 @wolfchimneyrock Thanks again for the PR. We verified and your suggestion makes sense. I have two asks: 1) Could you please file a bug and use the the jira id in the PR? 2) Would it be possible to add a small unit test for the fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]
chb2ab commented on PR #14627: URL: https://github.com/apache/kafka/pull/14627#issuecomment-1783399735 @jolshan np, I reverted that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]
dajac commented on PR #14589: URL: https://github.com/apache/kafka/pull/14589#issuecomment-1783398978 @dongnuo123 One of the build is still red. You may have to merge trunk to get the latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964660 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(); +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + +"support \"generic\" or \"consumer\". If \"consumer\" is specified, then the consumer group protocol will be " + +"used. Otherwise, the generic group protocol will be used."; + +/** +* group.remote.assignor +*/ +public static final String REMOTE_ASSIGNOR_CONFIG = "group.remote.assignor"; +public static final String DEFAULT_REMOTE_ASSIGNOR = null; +public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor to use. If no assignor is specified, " + Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964423 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig { public static final String HEARTBEAT_INTERVAL_MS_CONFIG = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG; private static final String HEARTBEAT_INTERVAL_MS_DOC = CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC; +/** + * group.protocol + */ +public static final String GROUP_PROTOCOL_CONFIG = "group.protocol"; +public static final String DEFAULT_GROUP_PROTOCOL = GroupProtocol.GENERIC.name().toLowerCase(); +public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol consumer should use. We currently " + Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]
dajac commented on code in PR #14642: URL: https://github.com/apache/kafka/pull/14642#discussion_r1374963674 ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: @philipnee It would be better to update the new code to follow the existing style. This is what we usually do… ## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ## @@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_ALLOW_AUTO_CREATE_TOPICS, Importance.MEDIUM, ALLOW_AUTO_CREATE_TOPICS_DOC) +.define(GROUP_PROTOCOL_CONFIG, +Type.STRING, +DEFAULT_GROUP_PROTOCOL, +ConfigDef.CaseInsensitiveValidString + .in(Utils.enumOptions(GroupProtocol.class)), +Importance.HIGH, +GROUP_PROTOCOL_DOC) +.define(REMOTE_ASSIGNOR_CONFIG, +Type.STRING, +DEFAULT_REMOTE_ASSIGNOR, +Importance.MEDIUM, +REMOTE_ASSIGNOR_DOC) Review Comment: @philipnee It would be better to update the new code to follow the existing style. This is what we usually do… -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]
dajac commented on code in PR #14657: URL: https://github.com/apache/kafka/pull/14657#discussion_r1374949071 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1370,7 +1371,9 @@ public void scheduleUnloadOperation( tp, partitionEpoch, context.epoch ); } -}); +} else { +log.info("No unload required since broker was not a coordinator for the given partition " + tp); Review Comment: info level is too much here, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]
dajac commented on code in PR #14657: URL: https://github.com/apache/kafka/pull/14657#discussion_r1374948601 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1359,7 +1359,8 @@ public void scheduleUnloadOperation( log.info("Scheduling unloading of metadata for {} with epoch {}", tp, partitionEpoch); Review Comment: We should perhaps rephrase this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]
lihaosky commented on code in PR #14426: URL: https://github.com/apache/kafka/pull/14426#discussion_r1374947218 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords( sharedTimeTracker.minTime = timestamp; // Skip next records if window has not closed -if (timestamp + joinAfterMs + joinGraceMs >= sharedTimeTracker.streamTime) { +if (sharedTimeTracker.minTime + windowsAfterIntervalMs + joinGraceMs >= sharedTimeTracker.streamTime) { Review Comment: I'm not sure about this part. If the `value` has right value and this is on right side, why don't we check with `windows.beforeMs` to see if it expires? Not sure why `windowsAfterIntervalMs` is always set to `windows.afterMs`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]
dajac commented on code in PR #14657: URL: https://github.com/apache/kafka/pull/14657#discussion_r1374948010 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java: ## @@ -1359,7 +1359,8 @@ public void scheduleUnloadOperation( log.info("Scheduling unloading of metadata for {} with epoch {}", tp, partitionEpoch); scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> { -withContextOrThrow(tp, context -> { +CoordinatorContext context = coordinators.get(tp); +if (context != null) { if (context.epoch < partitionEpoch) { Review Comment: You need to look the context before you use it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]
rreddy-22 opened a new pull request, #14657: URL: https://github.com/apache/kafka/pull/14657 **Problem Statement:** When a new leader is elected for a __consumer_offset partition, the followers are notified to unload the state. However, only the former leader is aware of it. The remaining follower prints out the following error: `ERROR [GroupCoordinator id=1] Execution of UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not the correct coordinator.. (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)` The error is actually correct and expected when in the remaining follower case, however this could be misleading. **Solution:** 1. When scheduleInternalOperation is called, inside the lambda function that is executed at the event runtime, the existence of the context is checked. 2. If the context doesn't exist, previously the exception would be thrown, now we just skip any operations and log an info message. Test was added to check the same, all other tests are still passing as expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]
dongnuo123 commented on code in PR #14656: URL: https://github.com/apache/kafka/pull/14656#discussion_r1374903378 ## core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala: ## @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.JoinGroupRequest +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testLeaveGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testLeaveGroup() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testLeaveGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testLeaveGroup() + } + + private def testLeaveGroup(): Unit = { + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- ApiKeys.LEAVE_GROUP.oldestVersion() to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) { Review Comment: The tests with version < 3 won't pass until we merge https://github.com/apache/kafka/pull/14635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jsancio commented on PR #14489: URL: https://github.com/apache/kafka/pull/14489#issuecomment-1783315545 @jolshan there are failing tests for this PR. Can you take a look when you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]
dongnuo123 commented on code in PR #14656: URL: https://github.com/apache/kafka/pull/14656#discussion_r1374900874 ## core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala: ## @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.JoinGroupRequest +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class LeaveGroupRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testLeaveGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { +testLeaveGroup() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( +new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), +new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), +new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testLeaveGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { +testLeaveGroup() + } + + private def testLeaveGroup(): Unit = { + +// Creates the __consumer_offsets topics because it won't be created automatically +// in this test because it does not use FindCoordinator API. +createOffsetsTopic() + +// Create the topic. +createTopic( + topic = "foo", + numPartitions = 3 +) + +for (version <- ApiKeys.LEAVE_GROUP.oldestVersion() to ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinDynamicConsumerGroupWithOldProtocol("grp-1") + val (_, _) = joinStaticConsumerGroupWithOldProtocol("grp-2", "group-instance-id") Review Comment: I still can't add static member and dynamic member to the same group. The reason shouldn't be the first member not rejoining the group, because the timeout already occurs when processing the second member's joining request. Joining two consecutive static members, or two consecutive dynamic members work. I wonder what's special about joining a static member then a dynamic member or joining a dynamic one and then a static one. @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]
dongnuo123 opened a new pull request, #14656: URL: https://github.com/apache/kafka/pull/14656 This pr is based on https://github.com/apache/kafka/pull/14589, Adding integration tests for Heartbeat API and GroupLeave API. ### JIRA https://issues.apache.org/jira/browse/KAFKA-15705 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]
jolshan commented on PR #14627: URL: https://github.com/apache/kafka/pull/14627#issuecomment-1783300697 @chb2ab sorry for the confusion. I realized that marking the version unstable will probably cause issues if the ibp suggests using that version. Since the tagged fields are the only difference, let's remove the unstable version flag. Sorry for the back and forth. After that, I will check the tests again and hopefully this will be good to go. https://github.com/apache/kafka/pull/14627#discussion_r1373924152 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15574) Update states and transitions for membership manager state machine
[ https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reassigned KAFKA-15574: -- Assignee: Lianet Magrans (was: Kirk True) > Update states and transitions for membership manager state machine > -- > > Key: KAFKA-15574 > URL: https://issues.apache.org/jira/browse/KAFKA-15574 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Blocker > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > This task is to update the state machine so that it correctly acts as the > glue between the heartbeat request manager and the assignment reconciler. > The state machine will transition from one state to another as a response to > heartbeats, callback completion, errors, unsubscribing, and other external > events. A given transition may kick off one or more actions that are > implemented outside of the state machine. > Steps: > # Update the set of states in the code as [defined in the diagrams on the > wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine] > # Ensure the correct state transitions are performed as responses to > external input > # _Define_ any actions that should be taken as a result of the above > transitions, but defer the _implementation_ to separate Jiras/PRs as much as > possible -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15705) Add integration tests for Heartbeat API and GroupLeave API
[ https://issues.apache.org/jira/browse/KAFKA-15705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongnuo Lyu reassigned KAFKA-15705: --- Assignee: Dongnuo Lyu > Add integration tests for Heartbeat API and GroupLeave API > -- > > Key: KAFKA-15705 > URL: https://issues.apache.org/jira/browse/KAFKA-15705 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15705) Add integration tests for Heartbeat API and GroupLeave API
Dongnuo Lyu created KAFKA-15705: --- Summary: Add integration tests for Heartbeat API and GroupLeave API Key: KAFKA-15705 URL: https://issues.apache.org/jira/browse/KAFKA-15705 Project: Kafka Issue Type: Sub-task Reporter: Dongnuo Lyu -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]
dajac merged PR #14524: URL: https://github.com/apache/kafka/pull/14524 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15704) ControllerRegistrationRequest must set ZkMigrationReady field if appropriate
Colin McCabe created KAFKA-15704: Summary: ControllerRegistrationRequest must set ZkMigrationReady field if appropriate Key: KAFKA-15704 URL: https://issues.apache.org/jira/browse/KAFKA-15704 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Colin McCabe Fix For: 3.7.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]
cmccabe commented on code in PR #14654: URL: https://github.com/apache/kafka/pull/14654#discussion_r1374859694 ## core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala: ## @@ -66,7 +66,12 @@ object ZkMigrationIntegrationTest { } def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): Unit = { -Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, MetadataVersion.IBP_3_6_IV2).foreach { mv => +Seq( + MetadataVersion.IBP_3_4_IV0, + MetadataVersion.IBP_3_5_IV2, + MetadataVersion.IBP_3_6_IV2, + MetadataVersion.latest() Review Comment: was skipping `MetadataVersion.IBP_3_7_IV0` intentional here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
cmccabe commented on PR #14290: URL: https://github.com/apache/kafka/pull/14290#issuecomment-1783251459 Thanks for the PR, @soarez ! Do you have revisions ready for this one or do you want one of us to jump in and fix these comments like we talked about earlier? I feel like it's really close now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]
jolshan commented on code in PR #14629: URL: https://github.com/apache/kafka/pull/14629#discussion_r1374844889 ## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ## @@ -53,25 +53,27 @@ object KafkaRequestHandler { * Wrap callback to schedule it on a request thread. * NOTE: this function must be called on a request thread. * @param fun Callback function to execute + * @param requestLocal The RequestLocal for the current request handler thread in case we need to call + * the callback function without queueing the callback request * @return Wrapped callback that would execute `fun` on a request thread */ - def wrap[T](fun: T => Unit): T => Unit = { + def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { Review Comment: That seems reasonable to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15355: Message schema changes [kafka]
cmccabe commented on code in PR #14290: URL: https://github.com/apache/kafka/pull/14290#discussion_r1374843847 ## clients/src/main/resources/common/message/AssignReplicasToDirsRequest.json: ## @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 73, + "type": "request", + "listeners": ["controller"], + "name": "AssignReplicasToDirsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, +{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", + "about": "The epoch of the requesting broker" }, +{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [ + { "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" }, + { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ +{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", Review Comment: This should be by ID, not name. ## clients/src/main/resources/common/message/AssignReplicasToDirsRequest.json: ## @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 73, + "type": "request", + "listeners": ["controller"], + "name": "AssignReplicasToDirsRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, +{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": "-1", + "about": "The epoch of the requesting broker" }, +{ "name": "Directories", "type": "[]DirectoryData", "versions": "0+", "fields": [ + { "name": "Id", "type": "uuid", "versions": "0+", "about": "The ID of the directory" }, + { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [ +{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", Review Comment: This should be by topic ID, not name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
mjsax commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1374842688 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java: ## @@ -346,6 +346,23 @@ public static Function getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), byteArray); } +@SuppressWarnings({"unchecked", "rawtypes"}) +public static Function getDeserializeValue2(final StateSerdes serdes, + final StateStore wrapped, + final boolean isDSLStore ) { +final Serde valueSerde = serdes.valueSerde(); +final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) || isDSLStore; +final Deserializer deserializer; +if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) { Review Comment: As discussed in person: for `RangeQuery`, `KeyValueToTimestampedKeyValueIteratorAdapter` need to wrap the provided `RocksDBRangeIterator` to translate between plain-byte[] format what we receive from the inner store, to the required timestamped-byte[] format that the upper layer expect. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org