Re: [PR] MINOR: fix typo and comment [kafka]

2023-10-27 Thread via GitHub


dengziming merged PR #14650:
URL: https://github.com/apache/kafka/pull/14650


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rename log dir UUIDs [kafka]

2023-10-27 Thread via GitHub


dengziming commented on code in PR #14517:
URL: https://github.com/apache/kafka/pull/14517#discussion_r1375160304


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class DirectoryId {
+
+/**
+ * A UUID that is used to identify new or unknown dir assignments.
+ */
+public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+
+/**
+ * A UUID that is used to represent unspecified offline dirs.
+ */
+public static final Uuid LOST = new Uuid(0L, 1L);
+
+/**
+ * A UUID that is used to represent and unspecified log directory,
+ * that is expected to have been previously selected to host an
+ * associated replica. This contrasts with {@code UNASSIGNED_DIR},
+ * which is associated with (typically new) replicas that may not
+ * yet have been placed in any log directory.
+ */
+public static final Uuid MIGRATING = new Uuid(0L, 2L);
+
+/**
+ * The set of reserved UUIDs that will never be returned by the random 
method.
+ */
+public static final Set RESERVED;
+
+static {
+HashSet reserved = new HashSet<>(Uuid.RESERVED);
+// The first 100 UUIDs are reserved for future use.
+for (long i = 0L; i < 100L; i++) {
+reserved.add(new Uuid(0L, i));
+}
+RESERVED = Collections.unmodifiableSet(reserved);
+}
+
+/**
+ * Static factory to generate a directory ID.
+ *
+ * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
+ */
+public static Uuid random() {
+Uuid uuid = Uuid.randomUuid();
+while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {

Review Comment:
   We don't need `uuid.toString().startsWith("-")` here, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-27 Thread via GitHub


CalvinConfluent commented on PR #14603:
URL: https://github.com/apache/kafka/pull/14603#issuecomment-1783656823

   Found wield UT failure, rebase trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375136169


##
build.gradle:
##
@@ -1361,6 +1375,21 @@ project(':clients') {
 generator project(':generator')
   }
 
+  shadowJar {
+archiveClassifier = null

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375136109


##
build.gradle:
##
@@ -1342,6 +1348,14 @@ project(':clients') {
 implementation libs.lz4
 implementation libs.snappy
 implementation libs.slf4jApi
+implementation libs.opentelemetryProto
+
+// declare runtime libraries

Review Comment:
   Done.



##
build.gradle:
##
@@ -1361,6 +1375,21 @@ project(':clients') {
 generator project(':generator')
   }
 
+  shadowJar {
+archiveClassifier = null
+// KIP-714: move shaded dependencies to a shaded location
+relocate('io.opentelemetry', 'org.apache.kafka.shaded.io.opentelemetry')

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on PR #14618:
URL: https://github.com/apache/kafka/pull/14618#issuecomment-1783642480

   > @apoorvmittal10 could we show a diff before/after this change of:
   > 
   > * the pom
   > * the content of the jar – excluding 
`org/apache/kafka/shaded/com/google/protobuf/` and 
`org/apache/kafka/shaded/io/opentelemetry/proto/`
   > 
   > to make sure the only difference in the resulting artifact are those 
shaded classes?
   
   @xvrl Please find the details below, AK `trunk` branch build vs `changes` in 
PR. Do you think we should relocate `*.proto` files too, didn't do that as I 
felt moving classes should be sufficient?
   
```
   ➜  3.7.0-SNAPSHOT-trunk> jar -tf kafka-clients-3.7.0-SNAPSHOT.jar  > 
jar_tf_output
   ➜  3.7.0-SNAPSHOT-changes>  jar -tf kafka-clients-3.7.0-SNAPSHOT.jar | grep 
-v org/apache/kafka/shaded/  > jar_tf_output
   ```
   
   ```
   git diff 3.7.0-SNAPSHOT-trunk/jar_tf_output 
3.7.0-SNAPSHOT-changes/jar_tf_output
   ```
   ![Screenshot 2023-10-28 at 1 40 09 
AM](https://github.com/apache/kafka/assets/2861565/b0638949-fae8-4568-a901-d736767e46f3)
   
   ```
   git diff 3.7.0-SNAPSHOT-trunk/pom.xml 3.7.0-SNAPSHOT-changes/pom.xml
   ```
   
   
   ![Screenshot 2023-10-28 at 1 42 06 
AM](https://github.com/apache/kafka/assets/2861565/a981319b-fccc-4f5e-9305-92e6b4908446)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375136205


##
build.gradle:
##
@@ -1380,7 +1409,9 @@ project(':clients') {
   }
 
   jar {
+enabled false
 dependsOn createVersionFile
+dependsOn 'shadowJar'
 from("$buildDir") {
 include "kafka/$buildVersionFileName"
 }

Review Comment:
   Thanks for pointing that, added other files too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780569#comment-17780569
 ] 

ASF GitHub Bot commented on KAFKA-15653:


ijuma commented on code in PR #564:
URL: https://github.com/apache/kafka-site/pull/564#discussion_r1375131837


##
36/upgrade.html:
##
@@ -116,6 +116,12 @@ Notable changes in 3
 For more information about the early access tiered storage 
feature, please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage;>KIP-405
 and
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes;>Tiered
 Storage Early Access Release Note.
 
+Transaction partition verification (https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense;>KIP-890)
+has been added to data partitions to prevent hanging transactions. 
Workloads with compression can experience InvalidRecordExceptions and 
UnknownServerExceptions.
+For workloads with compression, 
transaction.partition.verification.enable should be set to false. 
Note that the default for 3.6 is true.

Review Comment:
   I'd just say "This feature can be disabled by setting..." (instead of "For 
workloads with compression...".



##
36/upgrade.html:
##
@@ -116,6 +116,12 @@ Notable changes in 3
 For more information about the early access tiered storage 
feature, please check https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage;>KIP-405
 and
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Tiered+Storage+Early+Access+Release+Notes;>Tiered
 Storage Early Access Release Note.
 
+Transaction partition verification (https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense;>KIP-890)
+has been added to data partitions to prevent hanging transactions. 
Workloads with compression can experience InvalidRecordExceptions and 
UnknownServerExceptions.
+For workloads with compression, 
transaction.partition.verification.enable should be set to false. 
Note that the default for 3.6 is true.
+The configuration can also be updated dynamically and is applied 
to the broker.
+This will be fixed in a future release. See https://issues.apache.org/jira/browse/KAFKA-15653;>KAFKA-15653 for 
more details.

Review Comment:
   Can we say it will be fixed in 3.6.1?.





> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Justine Olshan
>Priority: Major
> Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> 

[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780567#comment-17780567
 ] 

ASF GitHub Bot commented on KAFKA-15653:


jolshan opened a new pull request, #564:
URL: https://github.com/apache/kafka-site/pull/564

   KAFKA-15653 can be painful for folks with compression. Adding a note about 
the issue and how to mitigate it.




> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Justine Olshan
>Priority: Major
> Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


ijuma commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375128035


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java:
##
@@ -55,20 +63,111 @@ class TxnPartitionEntry {
 .thenComparingInt(ProducerBatch::producerEpoch)
 .thenComparingInt(ProducerBatch::baseSequence);
 
-TxnPartitionEntry() {
+TxnPartitionEntry(TopicPartition topicPartition) {
+this.topicPartition = topicPartition;
 this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
 this.nextSequence = 0;
-this.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
 this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
 this.inflightBatchesBySequence = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
 }
 
-void resetSequenceNumbers(Consumer resetSequence) {
+ProducerIdAndEpoch producerIdAndEpoch() {
+return producerIdAndEpoch;
+}
+
+int nextSequence() {
+return nextSequence;
+}
+
+OptionalLong lastAckedOffset() {
+if (lastAckedOffset != ProduceResponse.INVALID_OFFSET)
+return OptionalLong.of(lastAckedOffset);
+return OptionalLong.empty();
+}
+
+OptionalInt lastAckedSequence() {
+if (lastAckedSequence != 
TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
+return OptionalInt.of(lastAckedSequence);
+return OptionalInt.empty();
+}
+
+boolean hasInflightBatches() {
+return !inflightBatchesBySequence.isEmpty();
+}
+
+ProducerBatch nextBatchBySequence() {
+return inflightBatchesBySequence.isEmpty() ? null : 
inflightBatchesBySequence.first();
+}
+
+void incrementSequence(int increment) {
+this.nextSequence = 
DefaultRecordBatch.incrementSequence(this.nextSequence, increment);
+}
+
+void addInflightBatch(ProducerBatch batch) {
+inflightBatchesBySequence.add(batch);
+}
+
+void setLastAckedOffset(long lastAckedOffset) {
+this.lastAckedOffset = lastAckedOffset;
+}
+
+void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) {
+final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
+resetSequenceNumbers(inFlightBatch -> {
+inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value);
+sequence.value += inFlightBatch.recordCount;
+});
+producerIdAndEpoch = newProducerIdAndEpoch;
+nextSequence = sequence.value;
+lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
+}
+
+void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) {
+decrementSequence(recordCount);
+resetSequenceNumbers(inFlightBatch -> {
+if (inFlightBatch.baseSequence() < baseSequence)
+return;
+
+int newSequence = inFlightBatch.baseSequence() - recordCount;
+if (newSequence < 0)
+throw new IllegalStateException("Sequence number for batch 
with sequence " + inFlightBatch.baseSequence()
++ " for partition " + topicPartition + " is going to 
become negative: " + newSequence);
+
+inFlightBatch.resetProducerState(new 
ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), 
newSequence);
+});
+}
+
+int maybeUpdateLastAckedSequence(int sequence) {

Review Comment:
   I tried to have non private methods first. No particular reason for these 
two, I moved them above `adjustSequencesDueToFailedBatch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


ijuma commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375127610


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3449,7 +3449,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
 
 assertFalse(transactionManager.hasInflightBatches(tp1));
-assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
+assertEquals(1, transactionManager.sequenceNumber(tp1));
+assertEquals(1, transactionManager.sequenceNumber(tp1));

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


ijuma commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375127506


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -778,22 +778,22 @@ public void testIdempotenceWithMultipleInflights() throws 
Exception {
 prepareAndReceiveInitProducerId(producerId, Errors.NONE);
 assertTrue(transactionManager.hasProducerId());
 
-assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+assertEquals(0, transactionManager.sequenceNumber(tp0));

Review Comment:
   I can't think of a good reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375122091


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -778,22 +778,22 @@ public void testIdempotenceWithMultipleInflights() throws 
Exception {
 prepareAndReceiveInitProducerId(producerId, Errors.NONE);
 assertTrue(transactionManager.hasProducerId());
 
-assertEquals(0, transactionManager.sequenceNumber(tp0).longValue());
+assertEquals(0, transactionManager.sequenceNumber(tp0));

Review Comment:
   why were these longs in the first place? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375121838


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -3449,7 +3449,8 @@ MAX_REQUEST_SIZE, ACKS_ALL, MAX_RETRIES, new 
SenderMetricsRegistry(new Metrics(t
 transactionManager.handleCompletedBatch(tp1b3, t1b3Response);
 
 assertFalse(transactionManager.hasInflightBatches(tp1));
-assertEquals(1, transactionManager.sequenceNumber(tp1).intValue());
+assertEquals(1, transactionManager.sequenceNumber(tp1));
+assertEquals(1, transactionManager.sequenceNumber(tp1));

Review Comment:
   nit: duplicated line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] refactor: introduce internal StoreFactory [kafka]

2023-10-27 Thread via GitHub


agavra opened a new pull request, #14659:
URL: https://github.com/apache/kafka/pull/14659

   ### Overview
   
   This PR sets up the necessary prerequisites to respect configurations such 
as `dsl.default.store.type` and the `dsl.store.suppliers.class` (introduced in 
#14648) without requiring them to be passed in to 
`StreamBuilder#new(TopologyConfig)` (passing them only into `new 
KafkaStreams(...)`.
   
   It essentially makes `StoreBuilder` an external-only API and internally it 
uses the `StoreFactory` equivalent. In a future PR, we will replace 
`KeyValueStoreMaterializer` with an implementation of `StoreFactory` that 
creates the store builder only after `configure()` is called.
   
   ### Testing
   
   There is no change in functionality for this PR
   
   ### Review Guide
   
   1. Start with looking at `StoreFactory` and read the JavaDocs, this is an 
interface representing what used to be 
`InternalTopologyBuilder.StateStoreFactory`
   2. Look at how `StoreBuilderWrapper` is what used to be the implementation 
of `InternalTopologyBuilder.StateStoreFactory`
   3. Note that `InternalTopologyBuilder#addStateStore` now takes in a 
`StoreFactory` instead of a `StoreBuilder` from everywhere that uses the DSL.
   4. The rest is piping that change around and wrapping `StoreBuilder` with 
`StoreBuilderWrapper`. In a future PR all `StoreBuilderWrappers` in the DSL 
will be replaced with a new one that respects the configurations passed in to 
`new KafkaStreams`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375118370


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java:
##
@@ -55,20 +63,111 @@ class TxnPartitionEntry {
 .thenComparingInt(ProducerBatch::producerEpoch)
 .thenComparingInt(ProducerBatch::baseSequence);
 
-TxnPartitionEntry() {
+TxnPartitionEntry(TopicPartition topicPartition) {
+this.topicPartition = topicPartition;
 this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
 this.nextSequence = 0;
-this.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
 this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
 this.inflightBatchesBySequence = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
 }
 
-void resetSequenceNumbers(Consumer resetSequence) {
+ProducerIdAndEpoch producerIdAndEpoch() {
+return producerIdAndEpoch;
+}
+
+int nextSequence() {
+return nextSequence;
+}
+
+OptionalLong lastAckedOffset() {
+if (lastAckedOffset != ProduceResponse.INVALID_OFFSET)
+return OptionalLong.of(lastAckedOffset);
+return OptionalLong.empty();
+}
+
+OptionalInt lastAckedSequence() {
+if (lastAckedSequence != 
TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
+return OptionalInt.of(lastAckedSequence);
+return OptionalInt.empty();
+}
+
+boolean hasInflightBatches() {
+return !inflightBatchesBySequence.isEmpty();
+}
+
+ProducerBatch nextBatchBySequence() {
+return inflightBatchesBySequence.isEmpty() ? null : 
inflightBatchesBySequence.first();
+}
+
+void incrementSequence(int increment) {
+this.nextSequence = 
DefaultRecordBatch.incrementSequence(this.nextSequence, increment);
+}
+
+void addInflightBatch(ProducerBatch batch) {
+inflightBatchesBySequence.add(batch);
+}
+
+void setLastAckedOffset(long lastAckedOffset) {
+this.lastAckedOffset = lastAckedOffset;
+}
+
+void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) {
+final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
+resetSequenceNumbers(inFlightBatch -> {
+inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value);
+sequence.value += inFlightBatch.recordCount;
+});
+producerIdAndEpoch = newProducerIdAndEpoch;
+nextSequence = sequence.value;
+lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
+}
+
+void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) {
+decrementSequence(recordCount);
+resetSequenceNumbers(inFlightBatch -> {
+if (inFlightBatch.baseSequence() < baseSequence)
+return;
+
+int newSequence = inFlightBatch.baseSequence() - recordCount;
+if (newSequence < 0)
+throw new IllegalStateException("Sequence number for batch 
with sequence " + inFlightBatch.baseSequence()
++ " for partition " + topicPartition + " is going to 
become negative: " + newSequence);
+
+inFlightBatch.resetProducerState(new 
ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), 
newSequence);
+});
+}
+
+int maybeUpdateLastAckedSequence(int sequence) {

Review Comment:
   nit: for these two methods: in TransactionManager, they were listed before 
adjustSequencesDueToFailedBatch. Not a big deal, but was curious if we wanted 
the ordering to be consistent.  (Looks like we are pretty consistent with the 
TxnPartitionMap ordering)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Push down logic from TransactionManager to TxnPartitionEntry [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14591:
URL: https://github.com/apache/kafka/pull/14591#discussion_r1375118370


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TxnPartitionEntry.java:
##
@@ -55,20 +63,111 @@ class TxnPartitionEntry {
 .thenComparingInt(ProducerBatch::producerEpoch)
 .thenComparingInt(ProducerBatch::baseSequence);
 
-TxnPartitionEntry() {
+TxnPartitionEntry(TopicPartition topicPartition) {
+this.topicPartition = topicPartition;
 this.producerIdAndEpoch = ProducerIdAndEpoch.NONE;
 this.nextSequence = 0;
-this.lastAckedSequence = 
TransactionManager.NO_LAST_ACKED_SEQUENCE_NUMBER;
+this.lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
 this.lastAckedOffset = ProduceResponse.INVALID_OFFSET;
 this.inflightBatchesBySequence = new 
TreeSet<>(PRODUCER_BATCH_COMPARATOR);
 }
 
-void resetSequenceNumbers(Consumer resetSequence) {
+ProducerIdAndEpoch producerIdAndEpoch() {
+return producerIdAndEpoch;
+}
+
+int nextSequence() {
+return nextSequence;
+}
+
+OptionalLong lastAckedOffset() {
+if (lastAckedOffset != ProduceResponse.INVALID_OFFSET)
+return OptionalLong.of(lastAckedOffset);
+return OptionalLong.empty();
+}
+
+OptionalInt lastAckedSequence() {
+if (lastAckedSequence != 
TxnPartitionEntry.NO_LAST_ACKED_SEQUENCE_NUMBER)
+return OptionalInt.of(lastAckedSequence);
+return OptionalInt.empty();
+}
+
+boolean hasInflightBatches() {
+return !inflightBatchesBySequence.isEmpty();
+}
+
+ProducerBatch nextBatchBySequence() {
+return inflightBatchesBySequence.isEmpty() ? null : 
inflightBatchesBySequence.first();
+}
+
+void incrementSequence(int increment) {
+this.nextSequence = 
DefaultRecordBatch.incrementSequence(this.nextSequence, increment);
+}
+
+void addInflightBatch(ProducerBatch batch) {
+inflightBatchesBySequence.add(batch);
+}
+
+void setLastAckedOffset(long lastAckedOffset) {
+this.lastAckedOffset = lastAckedOffset;
+}
+
+void startSequencesAtBeginning(ProducerIdAndEpoch newProducerIdAndEpoch) {
+final PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
+resetSequenceNumbers(inFlightBatch -> {
+inFlightBatch.resetProducerState(newProducerIdAndEpoch, 
sequence.value);
+sequence.value += inFlightBatch.recordCount;
+});
+producerIdAndEpoch = newProducerIdAndEpoch;
+nextSequence = sequence.value;
+lastAckedSequence = NO_LAST_ACKED_SEQUENCE_NUMBER;
+}
+
+void adjustSequencesDueToFailedBatch(long baseSequence, int recordCount) {
+decrementSequence(recordCount);
+resetSequenceNumbers(inFlightBatch -> {
+if (inFlightBatch.baseSequence() < baseSequence)
+return;
+
+int newSequence = inFlightBatch.baseSequence() - recordCount;
+if (newSequence < 0)
+throw new IllegalStateException("Sequence number for batch 
with sequence " + inFlightBatch.baseSequence()
++ " for partition " + topicPartition + " is going to 
become negative: " + newSequence);
+
+inFlightBatch.resetProducerState(new 
ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), 
newSequence);
+});
+}
+
+int maybeUpdateLastAckedSequence(int sequence) {

Review Comment:
   nit: for these two methods: in TransactionManager, they were listed before 
adjustSequencesDueToFailedBatch. Not a big deal, but was curious if we wanted 
the ordering to be consistent. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]

2023-10-27 Thread via GitHub


mumrah commented on code in PR #14654:
URL: https://github.com/apache/kafka/pull/14654#discussion_r1375115830


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -66,7 +66,12 @@ object ZkMigrationIntegrationTest {
   }
 
   def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): 
Unit = {
-Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, 
MetadataVersion.IBP_3_6_IV2).foreach { mv =>
+Seq(
+  MetadataVersion.IBP_3_4_IV0,
+  MetadataVersion.IBP_3_5_IV2,
+  MetadataVersion.IBP_3_6_IV2,
+  MetadataVersion.latest()

Review Comment:
   No, but we should add it so it doesn't get skipped over in the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100926


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class MetricNamingConventionTest {

Review Comment:
   Added, thanks for the suggestion .



##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class MetricNamingConventionTest {

Review Comment:
   Added, thanks for the suggestion .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100801


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {

Review Comment:
   I have changed name to `TelemtryMetricNamingConvention` as we generally 
referred by same in KIP. Let me know if it works or we still feel that the name 
should be more specific as you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375099944


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {
+@Override
+public MetricKey metricKey(MetricName metricName) {
+Objects.requireNonNull(metricName, "metric name cannot be 
null");
+String group = metricName.group() == null ? "" : 
metricName.group();
+String rawName = metricName.name() == null ? "" : 
metricName.name();
+
+return new MetricKey(fullMetricName(domain, group, rawName),
+Collections.unmodifiableMap(cleanTags(metricName.tags(;
+}
+
+@Override
+public MetricKey derivedMetricKey(MetricKey key, String 
derivedComponent) {
+Objects.requireNonNull(derivedComponent, "derived component 
cannot be null");
+return new MetricKey(key.getName() + NAME_JOINER + 
derivedComponent, key.tags());
+}
+};
+}
+
+/**
+ * Creates a metric name given the domain, group, and name. The new String 
follows the following
+ * conventions and rules:
+ *
+ * 
+ *   domain is expected to be a host-name like value, e.g. {@code 
org.apache.kafka}
+ *   group is cleaned of redundant words: "-metrics"
+ *   the group and metric name is dot separated
+ *   The name is created by joining the three components, e.g.:
+ * {@code org.apache.kafka.producer.connection.creation.rate}
+ * 
+ */
+private static String fullMetricName(String domain, String group, String 
name) {
+return domain
++ NAME_JOINER
++ cleanGroup(group)
++ NAME_JOINER
++ cleanMetric(name);
+}
+
+/**
+ * This method maps a raw name to follow conventions and cleans up the 
result to be more legible:
+ * 
+ *   converts names to lower hyphen case conventions

Review Comment:
   Corrected the comment, my bad.



##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import 

Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100330


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {

Review Comment:
   Renamed `domain` -> `prefix`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100148


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {

Review Comment:
   Done, added test case as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375100061


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {
+@Override
+public MetricKey metricKey(MetricName metricName) {
+Objects.requireNonNull(metricName, "metric name cannot be 
null");
+String group = metricName.group() == null ? "" : 
metricName.group();

Review Comment:
   Removed, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375099869


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {
+@Override
+public MetricKey metricKey(MetricName metricName) {
+Objects.requireNonNull(metricName, "metric name cannot be 
null");
+String group = metricName.group() == null ? "" : 
metricName.group();
+String rawName = metricName.name() == null ? "" : 
metricName.name();
+
+return new MetricKey(fullMetricName(domain, group, rawName),
+Collections.unmodifiableMap(cleanTags(metricName.tags(;
+}
+
+@Override
+public MetricKey derivedMetricKey(MetricKey key, String 
derivedComponent) {
+Objects.requireNonNull(derivedComponent, "derived component 
cannot be null");
+return new MetricKey(key.getName() + NAME_JOINER + 
derivedComponent, key.tags());
+}
+};
+}
+
+/**
+ * Creates a metric name given the domain, group, and name. The new String 
follows the following
+ * conventions and rules:
+ *
+ * 
+ *   domain is expected to be a host-name like value, e.g. {@code 
org.apache.kafka}
+ *   group is cleaned of redundant words: "-metrics"
+ *   the group and metric name is dot separated
+ *   The name is created by joining the three components, e.g.:
+ * {@code org.apache.kafka.producer.connection.creation.rate}
+ * 
+ */
+private static String fullMetricName(String domain, String group, String 
name) {
+return domain
++ NAME_JOINER
++ cleanGroup(group)
++ NAME_JOINER
++ cleanMetric(name);
+}
+
+/**
+ * This method maps a raw name to follow conventions and cleans up the 
result to be more legible:
+ * 
+ *   converts names to lower hyphen case conventions
+ *   strips redundant parts of the metric name, such as -metrics
+ *   normalizes artifacts of hyphen case to dot separated conversion
+ * 
+ */
+private static String cleanGroup(String group) {
+group = clean(group, NAME_JOINER);
+return GROUP_PATTERN.matcher(group).replaceAll("");
+}
+
+private static String cleanMetric(String metric) {
+return clean(metric, NAME_JOINER);
+}
+
+/**
+ * Converts a tag name to match the telemetry naming conventions by 
converting snake_case.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375099765


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {
+@Override
+public MetricKey metricKey(MetricName metricName) {
+Objects.requireNonNull(metricName, "metric name cannot be 
null");
+String group = metricName.group() == null ? "" : 
metricName.group();
+String rawName = metricName.name() == null ? "" : 
metricName.name();
+
+return new MetricKey(fullMetricName(domain, group, rawName),
+Collections.unmodifiableMap(cleanTags(metricName.tags(;
+}
+
+@Override
+public MetricKey derivedMetricKey(MetricKey key, String 
derivedComponent) {
+Objects.requireNonNull(derivedComponent, "derived component 
cannot be null");
+return new MetricKey(key.getName() + NAME_JOINER + 
derivedComponent, key.tags());
+}
+};
+}
+
+/**
+ * Creates a metric name given the domain, group, and name. The new String 
follows the following
+ * conventions and rules:
+ *
+ * 
+ *   domain is expected to be a host-name like value, e.g. {@code 
org.apache.kafka}
+ *   group is cleaned of redundant words: "-metrics"
+ *   the group and metric name is dot separated
+ *   The name is created by joining the three components, e.g.:
+ * {@code org.apache.kafka.producer.connection.creation.rate}
+ * 
+ */
+private static String fullMetricName(String domain, String group, String 
name) {
+return domain
++ NAME_JOINER
++ cleanGroup(group)
++ NAME_JOINER
++ cleanMetric(name);
+}
+
+/**
+ * This method maps a raw name to follow conventions and cleans up the 
result to be more legible:
+ * 
+ *   converts names to lower hyphen case conventions
+ *   strips redundant parts of the metric name, such as -metrics
+ *   normalizes artifacts of hyphen case to dot separated conversion
+ * 
+ */
+private static String cleanGroup(String group) {
+group = clean(group, NAME_JOINER);
+return GROUP_PATTERN.matcher(group).replaceAll("");
+}
+
+private static String cleanMetric(String metric) {
+return clean(metric, NAME_JOINER);
+}
+
+/**
+ * Converts a tag name to match the telemetry naming conventions by 
converting snake_case.
+ * 
+ * Kafka metrics have tags name in lower case separated by hyphens. Eg: 
total-errors
+ *
+ * @param raw the input map
+ * @return the new map with keys replaced by snake_case representations.
+ */
+private static Map cleanTags(Map raw) {
+return raw.entrySet()
+.stream()
+.collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), 
Entry::getValue));
+}
+
+private static String clean(String raw, String joiner) {
+Objects.requireNonNull(raw, 

[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-10-27 Thread Jakub Scholz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780554#comment-17780554
 ] 

Jakub Scholz commented on KAFKA-14941:
--

The idea with which I opened it was to:
 * Have it documented in the first place
 * If it is done in the Java code in the same way as for example the current 
flag indicating whether an option is read-only or not, it would be a plus as 
that can be used in other applications as well.

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14349) Support dynamically resizing the KRaft controller's thread pools

2023-10-27 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780551#comment-17780551
 ] 

Colin McCabe commented on KAFKA-14349:
--

This was fixed as part of KAFKA-14351, but we forgot to close the JIRA. Closing 
now.

> Support dynamically resizing the KRaft controller's thread pools
> 
>
> Key: KAFKA-14349
> URL: https://issues.apache.org/jira/browse/KAFKA-14349
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: 4.0-blocker, kip-500
>
> Support dynamically resizing the KRaft controller's request handler and 
> network handler thread pools. See {{DynamicBrokerConfig.scala}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14369) Docs - KRAFT controller authentication example

2023-10-27 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780549#comment-17780549
 ] 

Colin McCabe commented on KAFKA-14369:
--

Thanks [~dbove]. I agree that it would be helpful to have an example config 
file with non-PLAINTEXT auth. If you have one, please post it here.

> Docs - KRAFT controller authentication example
> --
>
> Key: KAFKA-14369
> URL: https://issues.apache.org/jira/browse/KAFKA-14369
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Affects Versions: 3.3.1
>Reporter: Domenic Bove
>Priority: Minor
>  Labels: kraft
>
> The [Kafka Listener docs 
> |https://kafka.apache.org/documentation/#listener_configuration]mention how 
> to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller 
> listener, but it is not a working example and I found that I was missing this 
> property: 
> {code:java}
> sasl.mechanism.controller.protocol {code}
> when attempting to do SASL_PLAINTEXT on the controller listener. I see that 
> property here: 
> [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol]
> But nowhere else. 
> I wonder if a complete working example would be better. Here are my working 
> configs for sasl plain on the controller
> {code:java}
> process.roles=controller
> listeners=CONTROLLER://:9093 
> node.id=1
> controller.quorum.voters=1@localhost:9093
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT
> listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="admin" password="admin-secret" user_admin="admin-secret" 
> user_alice="alice-secret";
> listener.name.controller.sasl.enabled.mechanisms=PLAIN
> listener.name.controller.sasl.mechanism=PLAIN
> sasl.enabled.mechanisms=PLAIN
> sasl.mechanism.controller.protocol=PLAIN{code}
> Or maybe just a callout of that property in the existing docs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14369) Docs - KRAFT controller authentication example

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14369:
-
Labels: kraft  (was: 4.0-blocker)

> Docs - KRAFT controller authentication example
> --
>
> Key: KAFKA-14369
> URL: https://issues.apache.org/jira/browse/KAFKA-14369
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Affects Versions: 3.3.1
>Reporter: Domenic Bove
>Priority: Minor
>  Labels: kraft
>
> The [Kafka Listener docs 
> |https://kafka.apache.org/documentation/#listener_configuration]mention how 
> to handle kafka protocols (other than PLAINTEXT) on the KRAFT controller 
> listener, but it is not a working example and I found that I was missing this 
> property: 
> {code:java}
> sasl.mechanism.controller.protocol {code}
> when attempting to do SASL_PLAINTEXT on the controller listener. I see that 
> property here: 
> [https://kafka.apache.org/documentation/#brokerconfigs_sasl.mechanism.controller.protocol]
> But nowhere else. 
> I wonder if a complete working example would be better. Here are my working 
> configs for sasl plain on the controller
> {code:java}
> process.roles=controller
> listeners=CONTROLLER://:9093 
> node.id=1
> controller.quorum.voters=1@localhost:9093
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT
> listener.name.controller.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="admin" password="admin-secret" user_admin="admin-secret" 
> user_alice="alice-secret";
> listener.name.controller.sasl.enabled.mechanisms=PLAIN
> listener.name.controller.sasl.mechanism=PLAIN
> sasl.enabled.mechanisms=PLAIN
> sasl.mechanism.controller.protocol=PLAIN{code}
> Or maybe just a callout of that property in the existing docs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14927:
-
Labels:   (was: 4.0-blocker)

> Prevent kafka-configs.sh from setting non-alphanumeric config key names
> ---
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: Aman Singh
>Priority: Minor
> Fix For: 3.7.0
>
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
>   "default_topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
>   "topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
>   ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
>   "interbroker"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
>   "produce"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null}
>   "denied"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"denied"=null}
>   

[jira] [Commented] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names

2023-10-27 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780541#comment-17780541
 ] 

Colin McCabe commented on KAFKA-14927:
--

It looks like this change was committed. I will close the JIRA then.

> Prevent kafka-configs.sh from setting non-alphanumeric config key names
> ---
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: Aman Singh
>Priority: Minor
>  Labels: 4.0-blocker
> Fix For: 3.7.0
>
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
>   "default_topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
>   "topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
>   ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
>   "interbroker"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
>   "produce"=null sensitive=true 
> 

[jira] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names

2023-10-27 Thread Colin McCabe (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14927 ]


Colin McCabe deleted comment on KAFKA-14927:
--

was (Author: cmccabe):
It looks like this change was committed. I will close the JIRA then.

> Prevent kafka-configs.sh from setting non-alphanumeric config key names
> ---
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: Aman Singh
>Priority: Minor
>  Labels: 4.0-blocker
> Fix For: 3.7.0
>
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
>   "default_topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
>   "topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
>   ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
>   "interbroker"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
>   "produce"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"produce"=null}
>   "denied"=null sensitive=true 
> 

[jira] [Updated] (KAFKA-14927) Prevent kafka-configs.sh from setting non-alphanumeric config key names

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14927:
-
Summary: Prevent kafka-configs.sh from setting non-alphanumeric config key 
names  (was: Dynamic configs not validated when using kafka-configs and 
--add-config-file)

> Prevent kafka-configs.sh from setting non-alphanumeric config key names
> ---
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: Aman Singh
>Priority: Minor
>  Labels: 4.0-blocker
> Fix For: 3.7.0
>
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
>   "default_topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
>   "topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
>   ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
>   "interbroker"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
>   "produce"=null 

[jira] [Commented] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-10-27 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780540#comment-17780540
 ] 

Colin McCabe commented on KAFKA-14941:
--

I'm not sure that I totally understand the goal here.

If the goal is to be able to dynamically change configurations, that does not 
require leaving the configuration out of the static broker or controller config 
file. The dynamic configuration always takes precedence.

If the goal is to understand what the configuration does, the help text of the 
configuration should explain that.

Can you explain a bit more about the goal?

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14941) Document which configuration options are applicable only to processes with broker role or controller role

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-14941:
-
Labels:   (was: 4.0-blocker)

> Document which configuration options are applicable only to processes with 
> broker role or controller role
> -
>
> Key: KAFKA-14941
> URL: https://issues.apache.org/jira/browse/KAFKA-14941
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jakub Scholz
>Priority: Major
>
> When running in KRaft mode, some of the configuration options are applicable 
> only to nodes with the broker process role and some are applicable only to 
> the nodes with the controller process roles. It would be great if this 
> information was part of the documentation (e.g. in the [Broker 
> Configs|https://kafka.apache.org/documentation/#brokerconfigs] table on the 
> website), but if it was also part of the config classes so that it can be 
> used in situations when the configuration is dynamically configured to for 
> example filter the options applicable to different nodes. This would allow 
> having configuration files with only the actually used configuration options 
> and for example, help to reduce unnecessary restarts when rolling out new 
> configurations etc.
> For some options, it seems clear and the Kafka node would refuse to start if 
> they are set - for example the configurations of the non-controler-listeners 
> in controller-only nodes. For others, it seems a bit less clear (Does 
> {{compression.type}} option apply to controller-only nodes? Or the 
> configurations for the offset topic? etc.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375083080


##
build.gradle:
##
@@ -1342,6 +1348,14 @@ project(':clients') {
 implementation libs.lz4
 implementation libs.snappy
 implementation libs.slf4jApi
+implementation libs.opentelemetryProto
+
+// declare runtime libraries

Review Comment:
   can we add a comment to explain which dependencies should be declared here?



##
build.gradle:
##
@@ -1342,6 +1348,14 @@ project(':clients') {
 implementation libs.lz4
 implementation libs.snappy
 implementation libs.slf4jApi
+implementation libs.opentelemetryProto
+
+// declare runtime libraries

Review Comment:
   can we add a comment to explain which dependencies should be declared here 
and why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375082874


##
build.gradle:
##
@@ -1361,6 +1375,21 @@ project(':clients') {
 generator project(':generator')
   }
 
+  shadowJar {
+archiveClassifier = null
+// KIP-714: move shaded dependencies to a shaded location
+relocate('io.opentelemetry', 'org.apache.kafka.shaded.io.opentelemetry')

Review Comment:
   can we limit the relocation to the more specific `io.opentelemetry.proto` 
package since there shouldn't be any other opentelemetry classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15489) split brain in KRaft cluster

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15489:
-
Labels:   (was: 4.0-blocker)

> split brain in KRaft cluster 
> -
>
> Key: KAFKA-15489
> URL: https://issues.apache.org/jira/browse/KAFKA-15489
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> I found in the current KRaft implementation, when network partition happened 
> between the current controller leader and the other controller nodes, the 
> "split brain" issue will happen. It causes 2 leaders will exist in the 
> controller cluster, and 2 inconsistent sets of metadata will return to the 
> clients.
>  
> *Root cause*
> In 
> [KIP-595|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-Vote],
>  we said A voter will begin a new election under three conditions:
> 1. If it fails to receive a FetchResponse from the current leader before 
> expiration of quorum.fetch.timeout.ms
> 2. If it receives a EndQuorumEpoch request from the current leader
> 3. If it fails to receive a majority of votes before expiration of 
> quorum.election.timeout.ms after declaring itself a candidate.
> And that's exactly what the current KRaft's implementation.
>  
> However, when the leader is isolated from the network partition, there's no 
> way for it to resign from the leadership and start a new election. So the 
> leader will always be the leader even though all other nodes are down. And 
> this makes the split brain issue possible.
> When reading further in the KIP-595, I found we indeed considered this 
> situation and have solution for that. in [this 
> section|https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-LeaderProgressTimeout],
>  it said:
> {quote}In the pull-based model, however, say a new leader has been elected 
> with a new epoch and everyone has learned about it except the old leader 
> (e.g. that leader was not in the voters anymore and hence not receiving the 
> BeginQuorumEpoch as well), then that old leader would not be notified by 
> anyone about the new leader / epoch and become a pure "zombie leader", as 
> there is no regular heartbeats being pushed from leader to the follower. This 
> could lead to stale information being served to the observers and clients 
> inside the cluster.
> {quote}
> {quote}To resolve this issue, we will piggy-back on the 
> "quorum.fetch.timeout.ms" config, such that if the leader did not receive 
> Fetch requests from a majority of the quorum for that amount of time, it 
> would begin a new election and start sending VoteRequest to voter nodes in 
> the cluster to understand the latest quorum. If it couldn't connect to any 
> known voter, the old leader shall keep starting new elections and bump the 
> epoch.
> {quote}
>  
> But we missed this implementation in current KRaft.
>  
> *The flow is like this:*
> 1. 3 controller nodes, A(leader), B(follower), C(follower)
> 2. network partition happened between [A] and [B, C].
> 3. B and C starts new election since fetch timeout expired before receiving 
> fetch response from leader A.
> 4. B (or C) is elected as a leader in new epoch, while A is still the leader 
> in old epoch.
> 5. broker D creates a topic "new", and updates to leader B.
> 6. broker E describe topic "new", but got nothing because it is connecting to 
> the old leader A.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-15513:
-
Labels:   (was: 4.0-blocker)

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-10-27 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780539#comment-17780539
 ] 

Colin McCabe commented on KAFKA-15513:
--

To be more concrete, you need to use the {{--add-scram}} argument to the 
{{kafka-storage.sh format}} command.

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>  Labels: 4.0-blocker
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on PR #14618:
URL: https://github.com/apache/kafka/pull/14618#issuecomment-1783559671

   @apoorvmittal10 could we show a diff before/after this change of:
   
   - the pom
   - the content of the jar – excluding 
`org/apache/kafka/shaded/com/google/protobuf/` and 
`org/apache/kafka/shaded/io/opentelemetry/proto/`
   
   to make sure the only difference in the resulting artifact are those shaded 
classes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-10-27 Thread Colin McCabe (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-15513 ]


Colin McCabe deleted comment on KAFKA-15513:
--

was (Author: cmccabe):
Currently, you need to add the controller principal to `super.users` rather 
than relying on SCRAM to configure it. This is no different than how in ZK 
mode, you must have working ZK auth before you can configure Kafka.

In the future, we will probably support configuring SCRAM prior to controller 
startup via the `kafka-format.sh` command. The mechanism is all there (in the 
form of the bootstrap file) but we haven't finished implementing it yet...

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>  Labels: 4.0-blocker
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15513) KRaft cluster fails with SCRAM authentication enabled for control-plane

2023-10-27 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780537#comment-17780537
 ] 

Colin McCabe commented on KAFKA-15513:
--

Currently, you need to add the controller principal to `super.users` rather 
than relying on SCRAM to configure it. This is no different than how in ZK 
mode, you must have working ZK auth before you can configure Kafka.

In the future, we will probably support configuring SCRAM prior to controller 
startup via the `kafka-format.sh` command. The mechanism is all there (in the 
form of the bootstrap file) but we haven't finished implementing it yet...

> KRaft cluster fails with SCRAM authentication enabled for control-plane
> ---
>
> Key: KAFKA-15513
> URL: https://issues.apache.org/jira/browse/KAFKA-15513
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.6.0, 3.5.1
>Reporter: migruiz4
>Priority: Major
>  Labels: 4.0-blocker
>
> We have observed a scenario where a KRaft cluster fails to bootstrap when 
> using SCRAM authentication for controller-to-controller communications.
> The steps to reproduce are simple:
>  * Deploy (at least) 2 Kafka servers using latest version 3.5.1.
>  * Configure a KRaft cluster, where the controller listener uses 
> SASL_PLAINTEXT + SCRAM-SHA-256 or SCRAM-SHA-512. In my case, I'm using the 
> recommended in-line jaas config 
> '{{{}listener.name..scram-sha-512.sasl.jaas.config{}}}'
>  * Run 'kafka-storage.sh' in both nodes using option '--add-scram' to create 
> the SCRAM user.
> When initialized, Controllers will fail to connect to each other with an 
> authentication error:
>  
> {code:java}
> [2023-08-01 11:12:45,295] ERROR [kafka-1-raft-outbound-request-thread]: 
> Failed to send the following request due to authentication error: 
> ClientRequest(expectResponse=true, 
> callback=kafka.raft.KafkaNetworkChannel$$Lambda$687/0x7f27d443fc60@2aba6075,
>  destination=0, correlationId=129, clientId=raft-client-1, 
> createdTimeMs=1690888364960, 
> requestBuilder=VoteRequestData(clusterId='abcdefghijklmnopqrstug', 
> topics=[TopicData(topicName='__cluster_metadata', 
> partitions=[PartitionData(partitionIndex=0, candidateEpoch=4, candidateId=1, 
> lastOffsetEpoch=0, lastOffset=0)])])) (kafka.raft.RaftSendThread) {code}
> Some additional details about the scenario that we tested out:
>  *  Controller listener does work when configured with SASL+PLAIN
>  * The issue only affects the Controller listener, SCRAM users created using 
> the same method work for data-plane listeners and inter-broker listeners.
>  
> Below you can find the exact configuration and command used to deploy:
>  * server.properties
> {code:java}
> listeners=INTERNAL://:9092,CLIENT://:9091,CONTROLLER://:9093
> advertised.listeners=INTERNAL://kafka-0:9092,CLIENT://:9091
> listener.security.protocol.map=INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,CONTROLLER:SASL_PLAINTEXT
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/bitnami/kafka/data
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
> log.retention.hours=168
> log.retention.check.interval.ms=30
> controller.listener.names=CONTROLLER
> controller.quorum.voters=0@kafka-0:9093,1@kafka-1:9093
> inter.broker.listener.name=INTERNAL
> node.id=0
> process.roles=controller,broker
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
> sasl.mechanism.controller.protocol=SCRAM-SHA-512
> listener.name.controller.sasl.enabled.mechanisms=SCRAM-SHA-512
> listener.name.controller.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="controller_user" password="controller_password";{code}
>  * kafka-storage.sh command
> {code:java}
> kafka-storage.sh format --config /path/to/server.properties 
> --ignore-formatted --cluster-id abcdefghijklmnopqrstuv --add-scram 
> SCRAM-SHA-512=[name=controller_user,password=controller_password] {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375080740


##
build.gradle:
##
@@ -1361,6 +1375,21 @@ project(':clients') {
 generator project(':generator')
   }
 
+  shadowJar {
+archiveClassifier = null

Review Comment:
   can we add a comment to explain why archiveClassifier is set to null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375078778


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Yeah, the hope is that we'd see more async patterns as we evolve Kafka, and 
the way it's currently implemented, we just say "execute callback on the 
_current_ thread pool", which is why the thread local is used -- this way we 
can implement a non-blocking wait-and-continue-on-current-thread-pool from any 
level without passing additional arguments through the whole stack.  The same 
functionality could be implemented on other thread pools, e.g. if the group 
coordinator has its own thread pool, we could implement the same primitive 
there as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15668: Adding Opentelmetry shadowed library (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14618:
URL: https://github.com/apache/kafka/pull/14618#discussion_r1375080303


##
build.gradle:
##
@@ -1380,7 +1409,9 @@ project(':clients') {
   }
 
   jar {
+enabled false
 dependsOn createVersionFile
+dependsOn 'shadowJar'
 from("$buildDir") {
 include "kafka/$buildVersionFileName"
 }

Review Comment:
   can we make sure this build version file is still included in the final jar 
like before?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375061797


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   As for just produce. Yes, this is the case now, but I think Artem was trying 
to create the wrap method as a "general" callback mechanism.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375060752


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   We pass the wrapped method which is a simpler type.  `type AppendCallback = 
Map[TopicPartition, Errors] => Unit`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-27 Thread via GitHub


apoorvmittal10 commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1375060555


##
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java:
##
@@ -24,13 +24,14 @@
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.telemetry.ClientTelemetry;
 
 /**
  * A plugin interface to allow things to listen as new metrics are created so 
they can be reported.
  * 
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Reconfigurable, AutoCloseable {
+public interface MetricsReporter extends Reconfigurable, AutoCloseable, 
ClientTelemetry {

Review Comment:
   @xvrl @junrao I agree as well and checked in code do not have the change in 
MetricsReporter.java interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15704) ControllerRegistrationRequest must set ZkMigrationReady field if appropriate

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-15704.
--
Resolution: Fixed

> ControllerRegistrationRequest must set ZkMigrationReady field if appropriate
> 
>
> Key: KAFKA-15704
> URL: https://issues.apache.org/jira/browse/KAFKA-15704
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15704) ControllerRegistrationRequest must set ZkMigrationReady field if appropriate

2023-10-27 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-15704:


Assignee: David Arthur

> ControllerRegistrationRequest must set ZkMigrationReady field if appropriate
> 
>
> Key: KAFKA-15704
> URL: https://issues.apache.org/jira/browse/KAFKA-15704
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Colin McCabe
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375057023


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   > Hmmm. I'm not sure it makes sense to try to pass the request channel and 
current in to every method we want to do a callback for.
   
   Currently, the callback is only needed for produce request in 
ReplicaManager. If you look at `KafkaApis.handleProduceRequest`, we already 
pass in both request channel and current request to 
`ReplicaManager.appendRecords` through `sendResponseCallback`.
   
   > As for 2 and short circuiting... Are you suggesting I move the wrap call 
into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method 
in there as well. That may be trickier with the typing.
   
   Yes. Currently, we already pass in `appendEntries` as a callback to 
`AddPartitionsToTxnManager`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [KPERF-852] Rename method sendBrokerHeartbeat [kafka]

2023-10-27 Thread via GitHub


yuyli opened a new pull request, #14658:
URL: https://github.com/apache/kafka/pull/14658

   This is a PR to maintain the consistency between AK and CE-KAFKA
   Change:
   `sendBrokerHeartbeat ` -> `sendBrokerHeartbeatToUnfenceBrokers`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]

2023-10-27 Thread via GitHub


cmccabe commented on PR #14654:
URL: https://github.com/apache/kafka/pull/14654#issuecomment-1783521988

   committed, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on PR #14629:
URL: https://github.com/apache/kafka/pull/14629#issuecomment-1783521458

   Following up with build failure here: 
https://github.com/apache/kafka/pull/14545#issuecomment-1783515553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15704: Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]

2023-10-27 Thread via GitHub


cmccabe closed pull request #14654: KAFKA-15704: Set missing ZkMigrationReady 
field on ControllerRegistrationRequest
URL: https://github.com/apache/kafka/pull/14654


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-27 Thread via GitHub


junrao commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1375047867


##
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java:
##
@@ -24,13 +24,14 @@
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.telemetry.ClientTelemetry;
 
 /**
  * A plugin interface to allow things to listen as new metrics are created so 
they can be reported.
  * 
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Reconfigurable, AutoCloseable {
+public interface MetricsReporter extends Reconfigurable, AutoCloseable, 
ClientTelemetry {

Review Comment:
   I agree with Xavier. Also, the KIP didn't propose to change the 
`MetricsReporter` interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15605: Fix topic deletion handling during ZK migration [kafka]

2023-10-27 Thread via GitHub


jolshan commented on PR #14545:
URL: https://github.com/apache/kafka/pull/14545#issuecomment-1783515553

   Hey -- I think this broke the build for java 8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375044842


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Hmmm. I'm not sure it makes sense to try to pass the request channel and 
current in to every method we want to do a callback for.
   
   As for 2 and short circuiting... Are you suggesting I move the wrap call 
into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method 
in there as well. That may be trickier with the typing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375025398


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Ismael mentioned in 
https://github.com/apache/kafka/pull/9229#issuecomment-683352094 that thread 
locals are most useful when one doesn't control the code. So, I am wondering if 
we could get rid of the two ThreadLocal: `threadRequestChannel` and 
`threadCurrentRequest` in `KafkaRequestHandler` introduced in KAFKA-14561.  The 
reason for the former is to obtain requestChannel. We could simply pass in 
requestChannel to `ReplicaManager.appendRecords`. The reason for the latter is 
(1) to obtain currentRequest and (2) to make sure that the callback can be 
short-circuited if it's called on the same request handler thread. For (1), we 
could also pass in currentRequest to `ReplicaManager.appendRecords`. For (2), 
we could change the code such that `KafkaRequestHandler.wrap` is called only 
when the callback truly needs to be run from a different thread. Otherwise, we 
can just call the callback directly there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer

2023-10-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15550.

Resolution: Fixed

> OffsetsForTimes validation for negative timestamps in new consumer
> --
>
> Key: KAFKA-15550
> URL: https://issues.apache.org/jira/browse/KAFKA-15550
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, kip-848-preview
>
> OffsetsForTimes api call should fail with _IllegalArgumentException_ if 
> negative timestamps are provided as arguments. This will effectively exclude 
> earliest and latest offsets as target times, keeping the current behaviour of 
> the KafkaConsumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14620:
URL: https://github.com/apache/kafka/pull/14620#discussion_r1375028442


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/KafkaMetricsCollector.java:
##
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeCount;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Percentiles;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+import 
org.apache.kafka.common.telemetry.internals.LastValueTracker.InstantAndValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * All metrics implement the {@link MetricValueProvider} interface. They are 
divided into
+ * two base types:
+ *
+ * 
+ * {@link Gauge}
+ * {@link Measurable}
+ * 
+ *
+ * {@link Gauge Gauges} can have any value but we only collect metrics with 
number values.
+ * {@link Measurable Measurables} are divided into simple types with single 
values
+ * ({@link Avg}, {@link CumulativeCount}, {@link Min}, {@link Max}, {@link 
Rate},
+ * {@link SimpleRate}, and {@link CumulativeSum}) and compound types ({@link 
Frequencies},
+ * {@link Meter}, and {@link Percentiles}).
+ *
+ * 
+ *
+ * We can safely assume that a {@link CumulativeCount count} always increases 
in steady state. It
+ * should be a bug if a count metric decreases.
+ *
+ * 
+ *
+ * Total and Sum are treated as a monotonically increasing counter. The 
javadocs for Total metric type
+ * say "An un-windowed cumulative total maintained over all time.". The 
standalone Total metrics in
+ * the codebase seem to be cumulative metrics that will always increase. The 
Total metric underlying
+ * Meter type is mostly a Total of a Count metric.
+ * We can assume that a Total metric always increases (but it is not 
guaranteed as the sample values might be both
+ * negative or positive).
+ * For now, Total is converted to CUMULATIVE_DOUBLE unless we find a valid 
counter-example.
+ *
+ * 
+ *
+ * The Sum as it is a sample sum which is not a cumulative metric. It is 
converted to GAUGE_DOUBLE.
+ *
+ * 
+ *
+ * The compound metrics are virtual metrics. They are composed of simple types 
or anonymous measurable types
+ * which are reported. A compound metric is never reported as-is.
+ *
+ * 
+ *
+ * A Meter metric is always created with and reported as 2 KafkaExporter 
metrics: a rate and a
+ * count. For eg: org.apache.kafka.common.network.Selector has Meter metric 
for "connection-close" but it
+ * has to be created with a "connection-close-rate" metric of type rate and a 
"connection-close-total"
+ * metric of type total. So, we will never get a KafkaExporter metric with 
type Meter.
+ *
+ * 
+ *
+ * Frequencies is created with a array of Frequency objects. When a 
Frequencies metric is registered, each
+ * member Frequency object is converted into an anonymous Measurable and 
registered. So, a Frequencies metric
+ * is reported with a set of measurables with name = Frequency.name(). As 
there is no way to figure out the
+ * compound type, each component measurables is converted to a GAUGE_DOUBLE.
+ *

Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375026123


##
clients/src/test/java/org/apache/kafka/common/telemetry/internals/MetricNamingConventionTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class MetricNamingConventionTest {

Review Comment:
   can we add explicit tests for the metrics we have defined in the KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375025585


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {
+return new MetricNamingStrategy() {
+@Override
+public MetricKey metricKey(MetricName metricName) {
+Objects.requireNonNull(metricName, "metric name cannot be 
null");
+String group = metricName.group() == null ? "" : 
metricName.group();
+String rawName = metricName.name() == null ? "" : 
metricName.name();
+
+return new MetricKey(fullMetricName(domain, group, rawName),
+Collections.unmodifiableMap(cleanTags(metricName.tags(;
+}
+
+@Override
+public MetricKey derivedMetricKey(MetricKey key, String 
derivedComponent) {
+Objects.requireNonNull(derivedComponent, "derived component 
cannot be null");
+return new MetricKey(key.getName() + NAME_JOINER + 
derivedComponent, key.tags());
+}
+};
+}
+
+/**
+ * Creates a metric name given the domain, group, and name. The new String 
follows the following
+ * conventions and rules:
+ *
+ * 
+ *   domain is expected to be a host-name like value, e.g. {@code 
org.apache.kafka}
+ *   group is cleaned of redundant words: "-metrics"
+ *   the group and metric name is dot separated
+ *   The name is created by joining the three components, e.g.:
+ * {@code org.apache.kafka.producer.connection.creation.rate}
+ * 
+ */
+private static String fullMetricName(String domain, String group, String 
name) {
+return domain
++ NAME_JOINER
++ cleanGroup(group)
++ NAME_JOINER
++ cleanMetric(name);
+}
+
+/**
+ * This method maps a raw name to follow conventions and cleans up the 
result to be more legible:
+ * 
+ *   converts names to lower hyphen case conventions
+ *   strips redundant parts of the metric name, such as -metrics
+ *   normalizes artifacts of hyphen case to dot separated conversion
+ * 
+ */
+private static String cleanGroup(String group) {
+group = clean(group, NAME_JOINER);
+return GROUP_PATTERN.matcher(group).replaceAll("");
+}
+
+private static String cleanMetric(String metric) {
+return clean(metric, NAME_JOINER);
+}
+
+/**
+ * Converts a tag name to match the telemetry naming conventions by 
converting snake_case.
+ * 
+ * Kafka metrics have tags name in lower case separated by hyphens. Eg: 
total-errors
+ *
+ * @param raw the input map
+ * @return the new map with keys replaced by snake_case representations.
+ */
+private static Map cleanTags(Map raw) {
+return raw.entrySet()
+.stream()
+.collect(Collectors.toMap(s -> clean(s.getKey(), TAG_JOINER), 
Entry::getValue));
+}
+
+private static String clean(String raw, String joiner) {
+Objects.requireNonNull(raw, "metric data 

Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375023475


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {
+
+private static final String NAME_JOINER = ".";
+private static final String TAG_JOINER = "_";
+
+// remove metrics as it is redundant for telemetry metrics naming 
convention
+private final static Pattern GROUP_PATTERN = 
Pattern.compile("\\.(metrics)");
+
+public static MetricNamingStrategy 
getClientTelemetryMetricNamingStrategy(String domain) {

Review Comment:
   Maybe we can call it `prefix` instead of domain?
   
   I believe this class should be reusable to also expose broker, streams, etc. 
metrics in OpenTelemetry format one day, so I would suggest to keep it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1375016328


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   I think ideally in a future change we'd get rid of passing RequestLocal as 
an argument and maybe make it a static thread local that could be accessed from 
the point of use rather than being passed through the whole stack.
   
   There are couple problems that contributed to this issue:
   - the functions here are already written for asynchronous completion 
(because we wait for replication) and in such cases generally the convention is 
that the arguments of a function are not bound to the executing thread (i.e. 
rooted on the call stack or globally)
   - the point of use was outside of the core change so folks didn't look into 
the specifics of the RequestLocal semantics (i.e. even if appendEntries was an 
explicit function as it now is, I'm not sure if the problem had been noticed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15669: Implement telemetry metric naming strategy (KIP-714) [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14619:
URL: https://github.com/apache/kafka/pull/14619#discussion_r1375018518


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricNamingConvention.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates naming and mapping conventions defined as part of
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat;>Metrics
 naming and format
+ */
+public class MetricNamingConvention {

Review Comment:
   maybe a more explicit class name would help here. Since the goal of this 
mapping is to map the metric names to something that fits OpenTelemetry 
conventions, maybe we can call this OpenTelemetryMetricNamingConvention ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1375015180


##
clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.telemetry;
+
+import org.apache.kafka.common.metrics.MetricsContext;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * A {@link MetricsReporter} may implement this interface to indicate support 
for collecting client
+ * telemetry on the server side.
+ */
+@InterfaceStability.Evolving
+public interface ClientTelemetry {
+
+/**
+ * Implemented by the broker {@link MetricsReporter} to provide a {@link 
ClientTelemetryReceiver}

Review Comment:
   maybe it would be clearer to say that the broker calls this method to get 
the ClientTelemetryReceiver instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]

2023-10-27 Thread via GitHub


wolfchimneyrock commented on PR #14635:
URL: https://github.com/apache/kafka/pull/14635#issuecomment-1783465122

   > @wolfchimneyrock Thanks again for the PR. We verified and your suggestion 
makes sense. I have two asks:
   > 
   > 1. Could you please file a bug and use the the jira id in the PR?
   > 2. Would it be possible to add a small unit test for the fix?
   
   sure, I've applied for an apache JIRA account, and I'll start on a unit test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-27 Thread via GitHub


xvrl commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1375013255


##
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java:
##
@@ -24,13 +24,14 @@
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.telemetry.ClientTelemetry;
 
 /**
  * A plugin interface to allow things to listen as new metrics are created so 
they can be reported.
  * 
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Reconfigurable, AutoCloseable {
+public interface MetricsReporter extends Reconfigurable, AutoCloseable, 
ClientTelemetry {

Review Comment:
   the MetricsReporter plugin is a plugin for either server or client side. 
However the ClientTelemetry plugin is only relevant for broker side plugins



##
clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java:
##
@@ -24,13 +24,14 @@
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.telemetry.ClientTelemetry;
 
 /**
  * A plugin interface to allow things to listen as new metrics are created so 
they can be reported.
  * 
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
-public interface MetricsReporter extends Reconfigurable, AutoCloseable {
+public interface MetricsReporter extends Reconfigurable, AutoCloseable, 
ClientTelemetry {

Review Comment:
   the MetricsReporter plugin is a plugin for either server or client side. 
However the ClientTelemetry interface is only relevant for broker side plugins



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]

2023-10-27 Thread via GitHub


jolshan commented on PR #14627:
URL: https://github.com/apache/kafka/pull/14627#issuecomment-1783457546

   I will just watch the build now. Thanks @chb2ab 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1783444304

   @kirktrue @lucasbru @dajac - Thanks for taking time to review this PR. I've 
addressed the recent comments.  Let me know if there's anything uncleared.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


philipnee commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374998305


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   sorry - I thought you meant the whole block is off. corrected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: LeaveGroupResponse v0 - v2 loses its member under certain error conditions [kafka]

2023-10-27 Thread via GitHub


dajac commented on PR #14635:
URL: https://github.com/apache/kafka/pull/14635#issuecomment-1783402130

   @wolfchimneyrock Thanks again for the PR. We verified and your suggestion 
makes sense. I have two asks:
   1) Could you please file a bug and use the the jira id in the PR?
   2) Would it be possible to add a small unit test for the fix?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]

2023-10-27 Thread via GitHub


chb2ab commented on PR #14627:
URL: https://github.com/apache/kafka/pull/14627#issuecomment-1783399735

   @jolshan np, I reverted that change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-27 Thread via GitHub


dajac commented on PR #14589:
URL: https://github.com/apache/kafka/pull/14589#issuecomment-1783398978

   @dongnuo123 One of the build is still red. You may have to merge trunk to 
get the latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964660


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase();
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +
+"support \"generic\" or \"consumer\". If \"consumer\" is specified, 
then the consumer group protocol will be " +
+"used.  Otherwise, the generic group protocol will be used.";
+
+/**
+* group.remote.assignor
+*/
+public static final String REMOTE_ASSIGNOR_CONFIG = 
"group.remote.assignor";
+public static final String DEFAULT_REMOTE_ASSIGNOR = null;
+public static final String REMOTE_ASSIGNOR_DOC = "The server-side assignor 
to use. If no assignor is specified, " +

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374964423


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,23 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase();
+public static final String GROUP_PROTOCOL_DOC = "The rebalance protocol 
consumer should use.  We currently " +

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1374963674


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   @philipnee It would be better to update the new code to follow the existing 
style. This is what we usually do…



##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -610,6 +628,18 @@ public class ConsumerConfig extends AbstractConfig {
 DEFAULT_ALLOW_AUTO_CREATE_TOPICS,
 Importance.MEDIUM,
 ALLOW_AUTO_CREATE_TOPICS_DOC)
+.define(GROUP_PROTOCOL_CONFIG,
+Type.STRING,
+DEFAULT_GROUP_PROTOCOL,
+ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(GroupProtocol.class)),
+Importance.HIGH,
+GROUP_PROTOCOL_DOC)
+.define(REMOTE_ASSIGNOR_CONFIG,
+Type.STRING,
+DEFAULT_REMOTE_ASSIGNOR,
+Importance.MEDIUM,
+REMOTE_ASSIGNOR_DOC)

Review Comment:
   @philipnee It would be better to update the new code to follow the existing 
style. This is what we usually do…



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14657:
URL: https://github.com/apache/kafka/pull/14657#discussion_r1374949071


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1370,7 +1371,9 @@ public void scheduleUnloadOperation(
 tp, partitionEpoch, context.epoch
 );
 }
-});
+} else {
+log.info("No unload required since broker was not a 
coordinator for the given partition " + tp);

Review Comment:
   info level is too much here, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14657:
URL: https://github.com/apache/kafka/pull/14657#discussion_r1374948601


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1359,7 +1359,8 @@ public void scheduleUnloadOperation(
 log.info("Scheduling unloading of metadata for {} with epoch {}", tp, 
partitionEpoch);

Review Comment:
   We should perhaps rephrase this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15417 flip joinSpuriousLookBackTimeMs and emit non-joined items [kafka]

2023-10-27 Thread via GitHub


lihaosky commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1374947218


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -228,7 +231,7 @@ private void emitNonJoinedOuterRecords(
 sharedTimeTracker.minTime = timestamp;
 
 // Skip next records if window has not closed
-if (timestamp + joinAfterMs + joinGraceMs >= 
sharedTimeTracker.streamTime) {
+if (sharedTimeTracker.minTime + windowsAfterIntervalMs  + 
joinGraceMs >= sharedTimeTracker.streamTime) {

Review Comment:
   I'm not sure about this part. If the `value` has right value and this is on 
right side, why don't we check with `windows.beforeMs` to see if it expires? 
Not sure why `windowsAfterIntervalMs` is always set to `windows.afterMs`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]

2023-10-27 Thread via GitHub


dajac commented on code in PR #14657:
URL: https://github.com/apache/kafka/pull/14657#discussion_r1374948010


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1359,7 +1359,8 @@ public void scheduleUnloadOperation(
 log.info("Scheduling unloading of metadata for {} with epoch {}", tp, 
partitionEpoch);
 
 scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + 
partitionEpoch + ")", tp, () -> {
-withContextOrThrow(tp, context -> {
+CoordinatorContext context = coordinators.get(tp);
+if (context != null) {
 if (context.epoch < partitionEpoch) {

Review Comment:
   You need to look the context before you use it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]

2023-10-27 Thread via GitHub


rreddy-22 opened a new pull request, #14657:
URL: https://github.com/apache/kafka/pull/14657

   **Problem Statement:**
   When a new leader is elected for a __consumer_offset partition, the 
followers are notified to unload the state. However, only the former leader is 
aware of it. The remaining follower prints out the following error:
   `ERROR [GroupCoordinator id=1] Execution of 
UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not 
the correct coordinator.. 
(org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)`
   The error is actually correct and expected when in the remaining follower 
case, however this could be misleading.
   
   **Solution:**
   1.  When scheduleInternalOperation is called, inside the lambda function 
that is executed at the event runtime, the existence of the context is checked. 
   2. If the context doesn't exist, previously the exception would be thrown, 
now we just skip any operations and log an info message.
   
   Test was added to check the same, all other tests are still passing as 
expected.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]

2023-10-27 Thread via GitHub


dongnuo123 commented on code in PR #14656:
URL: https://github.com/apache/kafka/pull/14656#discussion_r1374903378


##
core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala:
##
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.JoinGroupRequest
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class LeaveGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testLeaveGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testLeaveGroup()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testLeaveGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+testLeaveGroup()
+  }
+
+  private def testLeaveGroup(): Unit = {
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- ApiKeys.LEAVE_GROUP.oldestVersion() to 
ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {

Review Comment:
   The tests with version < 3 won't pass until we merge 
https://github.com/apache/kafka/pull/14635



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-27 Thread via GitHub


jsancio commented on PR #14489:
URL: https://github.com/apache/kafka/pull/14489#issuecomment-1783315545

   @jolshan there are failing tests for this PR. Can you take a look when you 
have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]

2023-10-27 Thread via GitHub


dongnuo123 commented on code in PR #14656:
URL: https://github.com/apache/kafka/pull/14656#discussion_r1374900874


##
core/src/test/scala/unit/kafka/server/LeaveGroupRequestTest.scala:
##
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.JoinGroupRequest
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class LeaveGroupRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testLeaveGroupWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+testLeaveGroup()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testLeaveGroupWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+testLeaveGroup()
+  }
+
+  private def testLeaveGroup(): Unit = {
+
+// Creates the __consumer_offsets topics because it won't be created 
automatically
+// in this test because it does not use FindCoordinator API.
+createOffsetsTopic()
+
+// Create the topic.
+createTopic(
+  topic = "foo",
+  numPartitions = 3
+)
+
+for (version <- ApiKeys.LEAVE_GROUP.oldestVersion() to 
ApiKeys.LEAVE_GROUP.latestVersion(isUnstableApiEnabled)) {
+
+  // Join the consumer group. Note that we don't heartbeat here so we must 
use
+  // a session long enough for the duration of the test.
+  val (memberId1, _) = joinDynamicConsumerGroupWithOldProtocol("grp-1")
+  val (_, _) = joinStaticConsumerGroupWithOldProtocol("grp-2", 
"group-instance-id")

Review Comment:
   I still can't add static member and dynamic member to the same group. The 
reason shouldn't be the first member not rejoining the group, because the 
timeout already occurs when processing the second member's joining request.
   
   Joining two consecutive static members, or two consecutive dynamic members 
work. I wonder what's special about joining a static member then a dynamic 
member or joining a dynamic one and then a static one. @dajac 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15705: Add integration tests for Heartbeat API and GroupLeave API [kafka]

2023-10-27 Thread via GitHub


dongnuo123 opened a new pull request, #14656:
URL: https://github.com/apache/kafka/pull/14656

   This pr is based on https://github.com/apache/kafka/pull/14589,
   Adding integration tests for Heartbeat API and GroupLeave API.
   
   ### JIRA
   https://issues.apache.org/jira/browse/KAFKA-15705
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15661: KIP-951: protocol changes [kafka]

2023-10-27 Thread via GitHub


jolshan commented on PR #14627:
URL: https://github.com/apache/kafka/pull/14627#issuecomment-1783300697

   @chb2ab sorry for the confusion. I realized that marking the version 
unstable will probably cause issues if the ibp suggests using that version. 
Since the tagged fields are the only difference, let's remove the unstable 
version flag. Sorry for the back and forth. After that, I will check the tests 
again and hopefully this will be good to go.
   
   https://github.com/apache/kafka/pull/14627#discussion_r1373924152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15574) Update states and transitions for membership manager state machine

2023-10-27 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-15574:
--

Assignee: Lianet Magrans  (was: Kirk True)

> Update states and transitions for membership manager state machine
> --
>
> Key: KAFKA-15574
> URL: https://issues.apache.org/jira/browse/KAFKA-15574
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Blocker
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> This task is to update the state machine so that it correctly acts as the 
> glue between the heartbeat request manager and the assignment reconciler.
> The state machine will transition from one state to another as a response to 
> heartbeats, callback completion, errors, unsubscribing, and other external 
> events. A given transition may kick off one or more actions that are 
> implemented outside of the state machine.
> Steps:
>  # Update the set of states in the code as [defined in the diagrams on the 
> wiki|https://cwiki.apache.org/confluence/display/KAFKA/Consumer+rebalance#Consumerrebalance-RebalanceStateMachine]
>  # Ensure the correct state transitions are performed as responses to 
> external input
>  # _Define_ any actions that should be taken as a result of the above 
> transitions, but defer the _implementation_ to separate Jiras/PRs as much as 
> possible



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15705) Add integration tests for Heartbeat API and GroupLeave API

2023-10-27 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-15705:
---

Assignee: Dongnuo Lyu

> Add integration tests for Heartbeat API and GroupLeave API
> --
>
> Key: KAFKA-15705
> URL: https://issues.apache.org/jira/browse/KAFKA-15705
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15705) Add integration tests for Heartbeat API and GroupLeave API

2023-10-27 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-15705:
---

 Summary: Add integration tests for Heartbeat API and GroupLeave API
 Key: KAFKA-15705
 URL: https://issues.apache.org/jira/browse/KAFKA-15705
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-27 Thread via GitHub


dajac merged PR #14524:
URL: https://github.com/apache/kafka/pull/14524


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15704) ControllerRegistrationRequest must set ZkMigrationReady field if appropriate

2023-10-27 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-15704:


 Summary: ControllerRegistrationRequest must set ZkMigrationReady 
field if appropriate
 Key: KAFKA-15704
 URL: https://issues.apache.org/jira/browse/KAFKA-15704
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Colin McCabe
 Fix For: 3.7.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR Set missing ZkMigrationReady field on ControllerRegistrationRequest [kafka]

2023-10-27 Thread via GitHub


cmccabe commented on code in PR #14654:
URL: https://github.com/apache/kafka/pull/14654#discussion_r1374859694


##
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##
@@ -66,7 +66,12 @@ object ZkMigrationIntegrationTest {
   }
 
   def zkClustersForAllMigrationVersions(clusterGenerator: ClusterGenerator): 
Unit = {
-Seq(MetadataVersion.IBP_3_4_IV0, MetadataVersion.IBP_3_5_IV2, 
MetadataVersion.IBP_3_6_IV2).foreach { mv =>
+Seq(
+  MetadataVersion.IBP_3_4_IV0,
+  MetadataVersion.IBP_3_5_IV2,
+  MetadataVersion.IBP_3_6_IV2,
+  MetadataVersion.latest()

Review Comment:
   was skipping `MetadataVersion.IBP_3_7_IV0` intentional here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-27 Thread via GitHub


cmccabe commented on PR #14290:
URL: https://github.com/apache/kafka/pull/14290#issuecomment-1783251459

   Thanks for the PR, @soarez ! Do you have revisions ready for this one or do 
you want one of us to jump in and fix these comments like we talked about 
earlier? I feel like it's really close now  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-27 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1374844889


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {

Review Comment:
   That seems reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-27 Thread via GitHub


cmccabe commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1374843847


##
clients/src/main/resources/common/message/AssignReplicasToDirsRequest.json:
##
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 73,
+  "type": "request",
+  "listeners": ["controller"],
+  "name": "AssignReplicasToDirsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the requesting broker" },
+{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": 
"-1",
+  "about": "The epoch of the requesting broker" },
+{ "name": "Directories", "type":  "[]DirectoryData", "versions": "0+", 
"fields":  [
+  { "name":  "Id", "type": "uuid", "versions": "0+", "about": "The ID of 
the directory" },
+  { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
+{ "name":  "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",

Review Comment:
   This should be by ID, not name.



##
clients/src/main/resources/common/message/AssignReplicasToDirsRequest.json:
##
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 73,
+  "type": "request",
+  "listeners": ["controller"],
+  "name": "AssignReplicasToDirsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
+  "about": "The ID of the requesting broker" },
+{ "name": "BrokerEpoch", "type": "int64", "versions": "0+", "default": 
"-1",
+  "about": "The epoch of the requesting broker" },
+{ "name": "Directories", "type":  "[]DirectoryData", "versions": "0+", 
"fields":  [
+  { "name":  "Id", "type": "uuid", "versions": "0+", "about": "The ID of 
the directory" },
+  { "name": "Topics", "type": "[]TopicData", "versions": "0+", "fields": [
+{ "name":  "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",

Review Comment:
   This should be by topic ID, not name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-27 Thread via GitHub


mjsax commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1374842688


##
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java:
##
@@ -346,6 +346,23 @@ public static  Function 
getDeserializeValue(final StateSerdes deserializer.deserialize(serdes.topic(), 
byteArray);
 }
 
+@SuppressWarnings({"unchecked", "rawtypes"})
+public static  Function getDeserializeValue2(final 
StateSerdes serdes,
+   final 
StateStore wrapped,
+   final boolean 
isDSLStore ) {
+final Serde valueSerde = serdes.valueSerde();
+final boolean timestamped = WrappedStateStore.isTimestamped(wrapped) 
|| isDSLStore;
+final Deserializer deserializer;
+if (!timestamped && valueSerde instanceof ValueAndTimestampSerde) {

Review Comment:
   As discussed in person: for `RangeQuery`, 
`KeyValueToTimestampedKeyValueIteratorAdapter` need to wrap the provided 
`RocksDBRangeIterator` to translate between plain-byte[] format what we receive 
from the inner store, to the required timestamped-byte[] format that the upper 
layer expect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >