Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]

2023-10-30 Thread via GitHub


ijuma commented on code in PR #14671:
URL: https://github.com/apache/kafka/pull/14671#discussion_r1377053376


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -42,29 +42,27 @@ public class DirectoryId {
 public static final Uuid MIGRATING = new Uuid(0L, 2L);
 
 /**
- * The set of reserved UUIDs that will never be returned by the random 
method.
+ * @return true if the given ID is reserved. An ID is reserved if it is 
one of the first 100,
+ * or if its string representation starts with a dash. ("-")
  */
-public static final Set RESERVED;
-
-static {
-HashSet reserved = new HashSet<>(Uuid.RESERVED);
-// The first 100 UUIDs are reserved for future use.
-for (long i = 0L; i < 100L; i++) {
-reserved.add(new Uuid(0L, i));
-}
-RESERVED = Collections.unmodifiableSet(reserved);
+public boolean isReserved(Uuid id) {
+return id.toString().startsWith("-") ||

Review Comment:
   It's wasteful to generate the string to verify this - isn't there a cheaper 
way to do it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-30 Thread via GitHub


CalvinConfluent commented on PR #14593:
URL: https://github.com/apache/kafka/pull/14593#issuecomment-1786438703

   Failing tests are irrelevant. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]

2023-10-30 Thread via GitHub


hudeqi commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1377023677


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig,
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+// `tieredEnabledPartitions` has exclude internal topics
+val tieredEnabledPartitions = partitionsToStop.filter(sp => 
logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled()))

Review Comment:
   The calculation of `tieredEnabledPartitions` may not be placed after if{}, 
because `logManager.asyncDelete` will remove the corresponding unifiedLog from 
`currentLogs` in `logManager`, causing the calculation of 
`tieredEnabledPartitions` to be incorrect. But I have merged the two filter 
conditions.
   1. The calculation of `tieredEnabledPartitions` actually has excluded 
internal topics, because 
https://github.com/apache/kafka/blob/8f8ad6db384ce30e8a5d848d3ed826a3f7a54dfe/core/src/main/scala/kafka/log/UnifiedLog.scala#L1887
   2. added.
   @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15686) Consumer should be able to detect network problem

2023-10-30 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781206#comment-17781206
 ] 

Philip Nee commented on KAFKA-15686:


[~ihavenoem...@163.com] - What is the use case of knowing which broker is down? 
If telemetry is set up for your kafka cluster, you should be able to tell by 
looking at the broker side metrics. It would be helpful to articulate a clear 
case to help community to understand the rationale behind the ask. Thanks.

> Consumer should be able to detect network problem
> -
>
> Key: KAFKA-15686
> URL: https://issues.apache.org/jira/browse/KAFKA-15686
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Jiahongchao
>Priority: Minor
>
> When we call poll method in consumer, it will return normally even if some 
> partitions do not have a leader.
> What should we do to detect such failures? Currently we have to check log to 
> find out broker connection problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15686) Consumer should be able to detect network problem

2023-10-30 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781206#comment-17781206
 ] 

Philip Nee edited comment on KAFKA-15686 at 10/31/23 4:03 AM:
--

[~ihavenoem...@163.com] - What is the use case of knowing which broker is down 
from client's prospective? If telemetry is set up for your kafka cluster, you 
should be able to tell by looking at the broker side metrics. It would be 
helpful to articulate a clear case to help community to understand the 
rationale behind the ask. Thanks.


was (Author: JIRAUSER283568):
[~ihavenoem...@163.com] - What is the use case of knowing which broker is down? 
If telemetry is set up for your kafka cluster, you should be able to tell by 
looking at the broker side metrics. It would be helpful to articulate a clear 
case to help community to understand the rationale behind the ask. Thanks.

> Consumer should be able to detect network problem
> -
>
> Key: KAFKA-15686
> URL: https://issues.apache.org/jira/browse/KAFKA-15686
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Jiahongchao
>Priority: Minor
>
> When we call poll method in consumer, it will return normally even if some 
> partitions do not have a leader.
> What should we do to detect such failures? Currently we have to check log to 
> find out broker connection problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]

2023-10-30 Thread via GitHub


hudeqi commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1377023677


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig,
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+// `tieredEnabledPartitions` has exclude internal topics
+val tieredEnabledPartitions = partitionsToStop.filter(sp => 
logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled()))

Review Comment:
   The calculation of `tieredEnabledPartitions` may not be placed after if{}, 
because `logManager.asyncDelete` will remove the corresponding unifiedLog from 
`currentLogs` in `logManager`, causing the calculation of 
`tieredEnabledPartitions` to be incorrect.
   1. The calculation of `tieredEnabledPartitions` actually has excluded 
internal topics, because 
https://github.com/apache/kafka/blob/8f8ad6db384ce30e8a5d848d3ed826a3f7a54dfe/core/src/main/scala/kafka/log/UnifiedLog.scala#L1887
   2. added.
   @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]

2023-10-30 Thread via GitHub


hudeqi commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1377023677


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig,
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+// `tieredEnabledPartitions` has exclude internal topics
+val tieredEnabledPartitions = partitionsToStop.filter(sp => 
logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled()))

Review Comment:
   The calculation of `tieredEnabledPartitions` may not be placed after if{}, 
because `logManager.asyncDelete` will remove the corresponding unifiedLog from 
`currentLogs` in `logManager`, causing the calculation of 
`tieredEnabledPartitions` to be incorrect.
   1. The calculation of `tieredEnabledPartitions` actually has excluded 
internal topics, because 
(here)[https://github.com/apache/kafka/blob/8f8ad6db384ce30e8a5d848d3ed826a3f7a54dfe/core/src/main/scala/kafka/log/UnifiedLog.scala#L1887]
   2. added.
   @divijvaidya 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15432:RLM Stop partitions should not be invoked for non-tiered storage topics [kafka]

2023-10-30 Thread via GitHub


hudeqi commented on code in PR #14667:
URL: https://github.com/apache/kafka/pull/14667#discussion_r1377020873


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -630,13 +630,17 @@ class ReplicaManager(val config: KafkaConfig,
 
 // Third delete the logs and checkpoint.
 val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+// `tieredEnabledPartitions` has exclude internal topics
+val tieredEnabledPartitions = partitionsToStop.filter(sp => 
logManager.getLog(sp.topicPartition).exists(unifiedLog => 
unifiedLog.remoteLogEnabled()))
 if (partitionsToDelete.nonEmpty) {
   // Delete the logs and checkpoint.
   logManager.asyncDelete(partitionsToDelete, isStray = false, (tp, e) => 
errorMap.put(tp, e))
 }
 remoteLogManager.foreach { rlm =>
   // exclude the partitions with offline/error state

Review Comment:
   updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-10-30 Thread via GitHub


kirktrue commented on PR #14672:
URL: https://github.com/apache/kafka/pull/14672#issuecomment-1786396441

   @philipnee Can you add the `ctr` tag, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove ambiguous constructor [kafka]

2023-10-30 Thread via GitHub


philipnee commented on PR #14598:
URL: https://github.com/apache/kafka/pull/14598#issuecomment-1786392041

   Hi @kirktrue @cadonna - Thank you for taking the time to review the PR.  I 
don't have a very strong opinion about the constructor, but I do want other 
refactor changes so let me know if there are any questions/concerns on those!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-30 Thread via GitHub


philipnee commented on PR #14642:
URL: https://github.com/apache/kafka/pull/14642#issuecomment-1786388893

   Hi @dajac - Thank you for the comments.  I addressed them in the latest 
commits. Would you mind reviewing the changes again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14858: Handle exceptions thrown from Connector::taskConfigs in Connect's standalone mode [kafka]

2023-10-30 Thread via GitHub


github-actions[bot] commented on PR #13530:
URL: https://github.com/apache/kafka/pull/13530#issuecomment-1786384406

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-30 Thread via GitHub


philipnee commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1377011485


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Value object that contains the name and tags for a Metric.
+ */
+public class MetricKey implements MetricKeyable {
+
+private final String name;
+private final Map tags;
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the telemetry metric name of 
the metric (the final name
+ * under which this metric is emitted).
+ */
+public MetricKey(String name) {
+this(name, null);
+}
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the .converted. name of the 
metric (the final name
+ * under which this metric is emitted).
+ * @param tags mapping of tag keys to values.
+ */
+public MetricKey(String name, Map tags) {
+this.name = Objects.requireNonNull(name);
+this.tags = tags != null ? Collections.unmodifiableMap(tags) : 
Collections.emptyMap();
+}
+
+public MetricKey(MetricName metricName) {
+this(metricName.name(), metricName.tags());
+}
+
+@Override
+public MetricKey key() {
+return this;
+}
+
+public String getName() {

Review Comment:
   @mjsax - I mentioned in our team meeting today.  A clear style guide seems 
missing because different commiters seem to do things differently.  I do want 
to take a crack at it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15281: Implement the groupMetadata Consumer API [kafka]

2023-10-30 Thread via GitHub


kirktrue opened a new pull request, #14672:
URL: https://github.com/apache/kafka/pull/14672

   This implements the `Consumer.groupMetadata()` API by means of an event 
passed to and fulfilled in the consumer network I/O thread. The application 
thread will block  until this event is processed in the background thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14671:
URL: https://github.com/apache/kafka/pull/14671#discussion_r1376940646


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -42,29 +42,27 @@ public class DirectoryId {
 public static final Uuid MIGRATING = new Uuid(0L, 2L);
 
 /**
- * The set of reserved UUIDs that will never be returned by the random 
method.
+ * @return true if the given ID is reserved. An ID is reserved if it is 
one of the first 100,
+ * or if its string representation starts with a dash. ("-")
  */
-public static final Set RESERVED;
-
-static {
-HashSet reserved = new HashSet<>(Uuid.RESERVED);
-// The first 100 UUIDs are reserved for future use.
-for (long i = 0L; i < 100L; i++) {
-reserved.add(new Uuid(0L, i));
-}
-RESERVED = Collections.unmodifiableSet(reserved);
+public boolean isReserved(Uuid id) {
+return id.toString().startsWith("-") ||
+(uuid.getMostSignificantBits() == 0 &&
+uuid.getLeastSignificantBits() < 100);
 }
 
 /**
  * Static factory to generate a directory ID.
  *
- * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
+ * This will not generate a reserved UUID (first 100), or one whose string 
representation
+ * starts with a dash ("-")
  */
 public static Uuid random() {
-Uuid uuid = Uuid.randomUuid();
-while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
-uuid = Uuid.randomUuid();
+while (true) {
+Uuid uuid = Uuid.randomUuid();

Review Comment:
   Doesn't the Uuid class also ensure this? 
   
   ```
   public static Uuid randomUuid() {
   Uuid uuid = unsafeRandomUuid();
   while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
   uuid = unsafeRandomUuid();
   }
   return uuid;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14671:
URL: https://github.com/apache/kafka/pull/14671#discussion_r1376940840


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -42,29 +42,27 @@ public class DirectoryId {
 public static final Uuid MIGRATING = new Uuid(0L, 2L);
 
 /**
- * The set of reserved UUIDs that will never be returned by the random 
method.
+ * @return true if the given ID is reserved. An ID is reserved if it is 
one of the first 100,
+ * or if its string representation starts with a dash. ("-")
  */
-public static final Set RESERVED;
-
-static {
-HashSet reserved = new HashSet<>(Uuid.RESERVED);
-// The first 100 UUIDs are reserved for future use.
-for (long i = 0L; i < 100L; i++) {
-reserved.add(new Uuid(0L, i));
-}
-RESERVED = Collections.unmodifiableSet(reserved);
+public boolean isReserved(Uuid id) {
+return id.toString().startsWith("-") ||
+(uuid.getMostSignificantBits() == 0 &&
+uuid.getLeastSignificantBits() < 100);
 }
 
 /**
  * Static factory to generate a directory ID.
  *
- * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
+ * This will not generate a reserved UUID (first 100), or one whose string 
representation
+ * starts with a dash ("-")
  */
 public static Uuid random() {
-Uuid uuid = Uuid.randomUuid();
-while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
-uuid = Uuid.randomUuid();
+while (true) {
+Uuid uuid = Uuid.randomUuid();
+if (isReserved(uuid)) {

Review Comment:
   shouldn't this be not reserved?  



##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -42,29 +42,27 @@ public class DirectoryId {
 public static final Uuid MIGRATING = new Uuid(0L, 2L);
 
 /**
- * The set of reserved UUIDs that will never be returned by the random 
method.
+ * @return true if the given ID is reserved. An ID is reserved if it is 
one of the first 100,
+ * or if its string representation starts with a dash. ("-")
  */
-public static final Set RESERVED;
-
-static {
-HashSet reserved = new HashSet<>(Uuid.RESERVED);
-// The first 100 UUIDs are reserved for future use.
-for (long i = 0L; i < 100L; i++) {
-reserved.add(new Uuid(0L, i));
-}
-RESERVED = Collections.unmodifiableSet(reserved);
+public boolean isReserved(Uuid id) {
+return id.toString().startsWith("-") ||
+(uuid.getMostSignificantBits() == 0 &&
+uuid.getLeastSignificantBits() < 100);
 }
 
 /**
  * Static factory to generate a directory ID.
  *
- * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
+ * This will not generate a reserved UUID (first 100), or one whose string 
representation
+ * starts with a dash ("-")
  */
 public static Uuid random() {
-Uuid uuid = Uuid.randomUuid();
-while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
-uuid = Uuid.randomUuid();
+while (true) {
+Uuid uuid = Uuid.randomUuid();
+if (isReserved(uuid)) {

Review Comment:
   shouldn't this be not reserved?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: small optimization for DirectoryId.random [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14671:
URL: https://github.com/apache/kafka/pull/14671#discussion_r1376940646


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -42,29 +42,27 @@ public class DirectoryId {
 public static final Uuid MIGRATING = new Uuid(0L, 2L);
 
 /**
- * The set of reserved UUIDs that will never be returned by the random 
method.
+ * @return true if the given ID is reserved. An ID is reserved if it is 
one of the first 100,
+ * or if its string representation starts with a dash. ("-")
  */
-public static final Set RESERVED;
-
-static {
-HashSet reserved = new HashSet<>(Uuid.RESERVED);
-// The first 100 UUIDs are reserved for future use.
-for (long i = 0L; i < 100L; i++) {
-reserved.add(new Uuid(0L, i));
-}
-RESERVED = Collections.unmodifiableSet(reserved);
+public boolean isReserved(Uuid id) {
+return id.toString().startsWith("-") ||
+(uuid.getMostSignificantBits() == 0 &&
+uuid.getLeastSignificantBits() < 100);
 }
 
 /**
  * Static factory to generate a directory ID.
  *
- * This will not generate a reserved UUID (first 100), or one whose string 
representation starts with a dash ("-")
+ * This will not generate a reserved UUID (first 100), or one whose string 
representation
+ * starts with a dash ("-")
  */
 public static Uuid random() {
-Uuid uuid = Uuid.randomUuid();
-while (RESERVED.contains(uuid) || uuid.toString().startsWith("-")) {
-uuid = Uuid.randomUuid();
+while (true) {
+Uuid uuid = Uuid.randomUuid();

Review Comment:
   Doesn't the Uuid class also ensure this? 
   
   ```
   public static Uuid randomUuid() {
   Uuid uuid = unsafeRandomUuid();
   while (uuid.equals(METADATA_TOPIC_ID) || uuid.equals(ZERO_UUID) || 
uuid.toString().startsWith("-")) {
   uuid = unsafeRandomUuid();
   }
   return uuid;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15281) Implement the groupMetadata Consumer API

2023-10-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15281:
-

Assignee: Kirk True

> Implement the groupMetadata Consumer API
> 
>
> Key: KAFKA-15281
> URL: https://issues.apache.org/jira/browse/KAFKA-15281
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-preview
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The threading refactor project needs to implement the {{groupMetadata()}} API 
> call once support for the KIP-848 protocol is implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-30 Thread via GitHub


kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1376925327


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -321,20 +320,6 @@ public class ConsumerConfig extends AbstractConfig {
  */
 static final String LEAVE_GROUP_ON_CLOSE_CONFIG = 
"internal.leave.group.on.close";
 
-/**
- * internal.throw.on.fetch.stable.offset.unsupported
- * Whether or not the consumer should throw when the new stable offset 
feature is supported.
- * If set to true then the client shall crash upon hitting it.
- * The purpose of this flag is to prevent unexpected broker downgrade 
which makes
- * the offset fetch protection against pending commit invalid. The safest 
approach
- * is to fail fast to avoid introducing correctness issue.
- *
- * 
- * Note: this is an internal configuration and could be changed in the 
future in a backward incompatible way
- *
- */
-static final String THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED = 
"internal.throw.on.fetch.stable.offset.unsupported";
-

Review Comment:
   I moved this to `ConsumerUtils` because `LegacyKafkaConsumer` and 
`AsyncKafkaConsumer` couldn't access it via package-level visibility since 
they're in the `internals` sub-package.
   
   @dajac—will this be considered a breaking change? I assumed not because 
`THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED` isn't public.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -250,7 +255,7 @@ public PrototypeAsyncConsumer(final Time time,
 // no coordinator will be constructed for the default (null) group 
id
 if (!groupId.isPresent()) {
 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-
//config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
+
config.ignore(ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);

Review Comment:
   Moving the package-level variable and method from `ConsumerConfig` also 
fixes this issue.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -982,23 +987,6 @@ private boolean initWithCommittedOffsetsIfNeeded(Timer 
timer) {
 }
 }
 
-// This is here temporary as we don't have public access to the 
ConsumerConfig in this module.
-public static Map appendDeserializerToConfig(Map configs,
- 
Deserializer keyDeserializer,
- 
Deserializer valueDeserializer) {
-// validate deserializer configuration, if the passed deserializer 
instance is null, the user must explicitly set a valid deserializer 
configuration value
-Map newConfigs = new HashMap<>(configs);
-if (keyDeserializer != null)
-newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializer.getClass());
-else if (newConfigs.get(KEY_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(KEY_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-if (valueDeserializer != null)
-newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializer.getClass());
-else if (newConfigs.get(VALUE_DESERIALIZER_CLASS_CONFIG) == null)
-throw new ConfigException(VALUE_DESERIALIZER_CLASS_CONFIG, null, 
"must be non-null.");
-return newConfigs;
-}
-

Review Comment:
   The original version of this method as it appeared in `ConsumerConfig` was 
moved to `ConsumerUtils` so now it can be used from here too  



##
tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java:
##
@@ -430,8 +430,9 @@ public void testConsume(final long prodTimeMs) throws 
Throwable {
 () -> log.info("offsetsForTime = {}", offsetsForTime.result));
 // Whether or not offsetsForTimes works, beginningOffsets and 
endOffsets
 // should work.
-consumer.beginningOffsets(timestampsToSearch.keySet());
-consumer.endOffsets(timestampsToSearch.keySet());
+Map beginningOffsets = 
consumer.beginningOffsets(timestampsToSearch.keySet());
+Map endingOffsets = 
consumer.endOffsets(timestampsToSearch.keySet());
+log.trace("beginningOffsets: {}, endingOffsets: {}", 
beginningOffsets, endingOffsets);

Review Comment:
   This is super annoying.
   
   I started getting errors from SpotBugs because the offsets methods were 
called but the return value was being ignored. This is a very brute force way 
of silencing the checker. I could not find a clean way to ignore the warning.
   
   I also don't know why this is suddenly being caught by SpotBugs. From its 
perspective, nothing has changed in the `KafkaConsumer` API, right?



##

Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-30 Thread via GitHub


mjsax commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1376921687


##
clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.telemetry.internals;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Value object that contains the name and tags for a Metric.
+ */
+public class MetricKey implements MetricKeyable {
+
+private final String name;
+private final Map tags;
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the telemetry metric name of 
the metric (the final name
+ * under which this metric is emitted).
+ */
+public MetricKey(String name) {
+this(name, null);
+}
+
+/**
+ * Create a {@code MetricKey}
+ *
+ * @param name metric name. This should be the .converted. name of the 
metric (the final name
+ * under which this metric is emitted).
+ * @param tags mapping of tag keys to values.
+ */
+public MetricKey(String name, Map tags) {
+this.name = Objects.requireNonNull(name);
+this.tags = tags != null ? Collections.unmodifiableMap(tags) : 
Collections.emptyMap();
+}
+
+public MetricKey(MetricName metricName) {
+this(metricName.name(), metricName.tags());
+}
+
+@Override
+public MetricKey key() {
+return this;
+}
+
+public String getName() {

Review Comment:
   There is no checkstyle rule that would fail the build -- its basically up to 
everybody (especially committer) to keep an eye out for it.
   
   The guidelines (https://kafka.apache.org/coding-guide) are not super clear 
about it unfortunately (too Scala centric), but I recently double check with a 
few other committer and there was agreement that not using `get` is the rule)
   
   > Avoid getters and setters - stick to plain vals or vars instead. If (later 
on) you require a custom setter (or getter) for a var named myVar then add a 
shadow var myVar_underlying and override the setter (def myVar_=) and the 
getter (def myVar = myVar_underlying).
   
   Wanna help with a PR to make it more explicit on the guidelines docs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: small optimization for DirectoryId.random [kafka]

2023-10-30 Thread via GitHub


cmccabe opened a new pull request, #14671:
URL: https://github.com/apache/kafka/pull/14671

   DirectoryId.random doesn't need to instantiate the first 100 IDs to check if 
an ID is one of them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-30 Thread via GitHub


junrao merged PR #14603:
URL: https://github.com/apache/kafka/pull/14603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781155#comment-17781155
 ] 

Colin McCabe commented on KAFKA-15754:
--

{quote}
I was wondering if there is any good reason for using a Base64 URL encoder and 
not just the RFC4648 (not URL safe) which uses the common Base64 alphabet not 
containing the "-".
{quote}

At one point, I did raise the question of why dash was used to serialize Kafka 
Uuids. But by the time I did so we were already using it in a few places so the 
question was not relevant. We're not going to change Uuid serialization now.

I think the general rationale was that dash and underscore were friendlier than 
slash and plus sign. But that's debatable, of course. Slash, at least, is not 
filesystem-safe.

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781152#comment-17781152
 ] 

Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:21 PM:
-

You can run this code yourself if you are curious. Here it is. You will need 
bash 4 or better. (my version is {{GNU bash, version 5.2.15(1)-release 
(aarch64-apple-darwin21.6.0)}})

{code}
#!/usr/bin/env bash

declare -A IDS_PER_INITIAL_LETTER
for ((i = 0; i < 1 ; i++)); do
./kafka-storage.sh random-uuid > /tmp/out 2> /dev/null
FIRST_LETTER=$(head -c 1 /tmp/out)

IDS_PER_INITIAL_LETTER[$FIRST_LETTER]=$((IDS_PER_INITIAL_LETTER[$FIRST_LETTER]+1))
done

for k in "${!IDS_PER_INITIAL_LETTER[@]}"; do
echo "IDs starting with $k : ${IDS_PER_INITIAL_LETTER[$k]}"
done
{code}


was (Author: cmccabe):
You can run this code yourself if you are curious. Here it is. You will need 
bash 4 or better. (my version is `GNU bash, version 5.2.15(1)-release 
(aarch64-apple-darwin21.6.0)`)

{code}
#!/usr/bin/env bash

declare -A IDS_PER_INITIAL_LETTER
for ((i = 0; i < 1 ; i++)); do
./kafka-storage.sh random-uuid > /tmp/out 2> /dev/null
FIRST_LETTER=$(head -c 1 /tmp/out)

IDS_PER_INITIAL_LETTER[$FIRST_LETTER]=$((IDS_PER_INITIAL_LETTER[$FIRST_LETTER]+1))
done

for k in "${!IDS_PER_INITIAL_LETTER[@]}"; do
echo "IDs starting with $k : ${IDS_PER_INITIAL_LETTER[$k]}"
done
{code}

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781153#comment-17781153
 ] 

Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:20 PM:
-

I am closing this JIRA because {{kafka-storage.sh random-uuid}} can not, in 
fact, generate uuids starting with {{-}}.

You can see this via analysis of the code or by just running it as I did


was (Author: cmccabe):
I am closing this JIRA because {{kafka-storage.sh random-uuid}} can not, in 
fact, generate uuids starting with '-'

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781153#comment-17781153
 ] 

Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:20 PM:
-

I am closing this JIRA because {{kafka-storage.sh random-uuid}} can not, in 
fact, generate uuids starting with '-'


was (Author: cmccabe):
I am closing this JIRA because `kafka-storage.sh` can not, in fact, generate 
uuids starting with '-'

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781151#comment-17781151
 ] 

Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:20 PM:
-

I ran {{kafka-storage.sh random-uuid}} 10,000 times and got the following 
distribution of first characters:
{code}
IDs starting with 0 : 166
IDs starting with 1 : 174
IDs starting with 2 : 135
IDs starting with 3 : 172
IDs starting with 4 : 155
IDs starting with 5 : 154
IDs starting with 6 : 152
IDs starting with 7 : 172
IDs starting with 8 : 170
IDs starting with 9 : 166
IDs starting with A : 147
IDs starting with B : 161
IDs starting with C : 172
IDs starting with D : 158
IDs starting with E : 164
IDs starting with F : 164
IDs starting with G : 146
IDs starting with H : 156
IDs starting with I : 166
IDs starting with J : 172
IDs starting with K : 177
IDs starting with L : 143
IDs starting with M : 171
IDs starting with N : 144
IDs starting with O : 157
IDs starting with P : 162
IDs starting with Q : 144
IDs starting with R : 157
IDs starting with S : 161
IDs starting with T : 158
IDs starting with U : 174
IDs starting with V : 166
IDs starting with W : 166
IDs starting with X : 159
IDs starting with Y : 165
IDs starting with Z : 161
IDs starting with _ : 159
IDs starting with a : 145
IDs starting with b : 169
IDs starting with c : 166
IDs starting with d : 171
IDs starting with e : 162
IDs starting with f : 154
IDs starting with g : 132
IDs starting with h : 152
IDs starting with i : 136
IDs starting with j : 166
IDs starting with k : 159
IDs starting with l : 156
IDs starting with m : 154
IDs starting with n : 155
IDs starting with o : 154
IDs starting with p : 158
IDs starting with q : 141
IDs starting with r : 165
IDs starting with s : 154
IDs starting with t : 162
IDs starting with u : 146
IDs starting with v : 161
IDs starting with w : 164
IDs starting with x : 154
IDs starting with y : 164
IDs starting with z : 154
{code}

No IDs were generated with a first character of {{-}}, as expected. 


was (Author: cmccabe):
I ran {kafka-storage.sh random-uuid} 10,000 times and got the following 
distribution of first characters:
{code}
IDs starting with 0 : 166
IDs starting with 1 : 174
IDs starting with 2 : 135
IDs starting with 3 : 172
IDs starting with 4 : 155
IDs starting with 5 : 154
IDs starting with 6 : 152
IDs starting with 7 : 172
IDs starting with 8 : 170
IDs starting with 9 : 166
IDs starting with A : 147
IDs starting with B : 161
IDs starting with C : 172
IDs starting with D : 158
IDs starting with E : 164
IDs starting with F : 164
IDs starting with G : 146
IDs starting with H : 156
IDs starting with I : 166
IDs starting with J : 172
IDs starting with K : 177
IDs starting with L : 143
IDs starting with M : 171
IDs starting with N : 144
IDs starting with O : 157
IDs starting with P : 162
IDs starting with Q : 144
IDs starting with R : 157
IDs starting with S : 161
IDs starting with T : 158
IDs starting with U : 174
IDs starting with V : 166
IDs starting with W : 166
IDs starting with X : 159
IDs starting with Y : 165
IDs starting with Z : 161
IDs starting with _ : 159
IDs starting with a : 145
IDs starting with b : 169
IDs starting with c : 166
IDs starting with d : 171
IDs starting with e : 162
IDs starting with f : 154
IDs starting with g : 132
IDs starting with h : 152
IDs starting with i : 136
IDs starting with j : 166
IDs starting with k : 159
IDs starting with l : 156
IDs starting with m : 154
IDs starting with n : 155
IDs starting with o : 154
IDs starting with p : 158
IDs starting with q : 141
IDs starting with r : 165
IDs starting with s : 154
IDs starting with t : 162
IDs starting with u : 146
IDs starting with v : 161
IDs starting with w : 164
IDs starting with x : 154
IDs starting with y : 164
IDs starting with z : 154
{code}

No IDs were generated with a first character of {-}, as expected. 

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> 

[jira] [Resolved] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-15754.
--
Resolution: Invalid

kafka-storage tool can not, in fact, generate uuids starting with '-'

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781153#comment-17781153
 ] 

Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:19 PM:
-

I am closing this JIRA because `kafka-storage.sh` can not, in fact, generate 
uuids starting with '-'


was (Author: cmccabe):
kafka-storage tool can not, in fact, generate uuids starting with '-'

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781151#comment-17781151
 ] 

Colin McCabe edited comment on KAFKA-15754 at 10/30/23 11:19 PM:
-

I ran {kafka-storage.sh random-uuid} 10,000 times and got the following 
distribution of first characters:
{code}
IDs starting with 0 : 166
IDs starting with 1 : 174
IDs starting with 2 : 135
IDs starting with 3 : 172
IDs starting with 4 : 155
IDs starting with 5 : 154
IDs starting with 6 : 152
IDs starting with 7 : 172
IDs starting with 8 : 170
IDs starting with 9 : 166
IDs starting with A : 147
IDs starting with B : 161
IDs starting with C : 172
IDs starting with D : 158
IDs starting with E : 164
IDs starting with F : 164
IDs starting with G : 146
IDs starting with H : 156
IDs starting with I : 166
IDs starting with J : 172
IDs starting with K : 177
IDs starting with L : 143
IDs starting with M : 171
IDs starting with N : 144
IDs starting with O : 157
IDs starting with P : 162
IDs starting with Q : 144
IDs starting with R : 157
IDs starting with S : 161
IDs starting with T : 158
IDs starting with U : 174
IDs starting with V : 166
IDs starting with W : 166
IDs starting with X : 159
IDs starting with Y : 165
IDs starting with Z : 161
IDs starting with _ : 159
IDs starting with a : 145
IDs starting with b : 169
IDs starting with c : 166
IDs starting with d : 171
IDs starting with e : 162
IDs starting with f : 154
IDs starting with g : 132
IDs starting with h : 152
IDs starting with i : 136
IDs starting with j : 166
IDs starting with k : 159
IDs starting with l : 156
IDs starting with m : 154
IDs starting with n : 155
IDs starting with o : 154
IDs starting with p : 158
IDs starting with q : 141
IDs starting with r : 165
IDs starting with s : 154
IDs starting with t : 162
IDs starting with u : 146
IDs starting with v : 161
IDs starting with w : 164
IDs starting with x : 154
IDs starting with y : 164
IDs starting with z : 154
{code}

No IDs were generated with a first character of {-}, as expected. 


was (Author: cmccabe):
I ran `kafka-storage.sh random-uuid` 10,000 times and got the following 
distribution of first characters:
{code}
IDs starting with 0 : 166
IDs starting with 1 : 174
IDs starting with 2 : 135
IDs starting with 3 : 172
IDs starting with 4 : 155
IDs starting with 5 : 154
IDs starting with 6 : 152
IDs starting with 7 : 172
IDs starting with 8 : 170
IDs starting with 9 : 166
IDs starting with A : 147
IDs starting with B : 161
IDs starting with C : 172
IDs starting with D : 158
IDs starting with E : 164
IDs starting with F : 164
IDs starting with G : 146
IDs starting with H : 156
IDs starting with I : 166
IDs starting with J : 172
IDs starting with K : 177
IDs starting with L : 143
IDs starting with M : 171
IDs starting with N : 144
IDs starting with O : 157
IDs starting with P : 162
IDs starting with Q : 144
IDs starting with R : 157
IDs starting with S : 161
IDs starting with T : 158
IDs starting with U : 174
IDs starting with V : 166
IDs starting with W : 166
IDs starting with X : 159
IDs starting with Y : 165
IDs starting with Z : 161
IDs starting with _ : 159
IDs starting with a : 145
IDs starting with b : 169
IDs starting with c : 166
IDs starting with d : 171
IDs starting with e : 162
IDs starting with f : 154
IDs starting with g : 132
IDs starting with h : 152
IDs starting with i : 136
IDs starting with j : 166
IDs starting with k : 159
IDs starting with l : 156
IDs starting with m : 154
IDs starting with n : 155
IDs starting with o : 154
IDs starting with p : 158
IDs starting with q : 141
IDs starting with r : 165
IDs starting with s : 154
IDs starting with t : 162
IDs starting with u : 146
IDs starting with v : 161
IDs starting with w : 164
IDs starting with x : 154
IDs starting with y : 164
IDs starting with z : 154
{code}

No IDs were generated with a first character of `-`, as expected. 

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> 

[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781152#comment-17781152
 ] 

Colin McCabe commented on KAFKA-15754:
--

You can run this code yourself if you are curious. Here it is. You will need 
bash 4 or better. (my version is `GNU bash, version 5.2.15(1)-release 
(aarch64-apple-darwin21.6.0)`)

{code}
#!/usr/bin/env bash

declare -A IDS_PER_INITIAL_LETTER
for ((i = 0; i < 1 ; i++)); do
./kafka-storage.sh random-uuid > /tmp/out 2> /dev/null
FIRST_LETTER=$(head -c 1 /tmp/out)

IDS_PER_INITIAL_LETTER[$FIRST_LETTER]=$((IDS_PER_INITIAL_LETTER[$FIRST_LETTER]+1))
done

for k in "${!IDS_PER_INITIAL_LETTER[@]}"; do
echo "IDs starting with $k : ${IDS_PER_INITIAL_LETTER[$k]}"
done
{code}

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-30 Thread via GitHub


CalvinConfluent commented on PR #14603:
URL: https://github.com/apache/kafka/pull/14603#issuecomment-1786194305

   https://issues.apache.org/jira/browse/KAFKA-15759 DescribeClusterRequestTest
   https://issues.apache.org/jira/browse/KAFKA-15760 CoordinatorTest
   https://issues.apache.org/jira/browse/KAFKA-15761 
ConnectorRestartApiIntegrationTest
   https://issues.apache.org/jira/browse/KAFKA-15762 ClusterConnectionStatesTest
   Added a few more common failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15762) ClusterConnectionStatesTest.testSingleIP is flaky

2023-10-30 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15762:
--

 Summary: ClusterConnectionStatesTest.testSingleIP is flaky
 Key: KAFKA-15762
 URL: https://issues.apache.org/jira/browse/KAFKA-15762
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Calvin Liu


Build / JDK 11 and Scala 2.13 / testSingleIP() – 
org.apache.kafka.clients.ClusterConnectionStatesTest
{code:java}
org.opentest4j.AssertionFailedError: expected: <1> but was: <2> at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)  at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)  at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)  at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:527)  at 
app//org.apache.kafka.clients.ClusterConnectionStatesTest.testSingleIP(ClusterConnectionStatesTest.java:267)
 at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
 Method) at 
java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at 
java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781151#comment-17781151
 ] 

Colin McCabe commented on KAFKA-15754:
--

I ran `kafka-storage.sh random-uuid` 10,000 times and got the following 
distribution of first characters:
{code}
IDs starting with 0 : 166
IDs starting with 1 : 174
IDs starting with 2 : 135
IDs starting with 3 : 172
IDs starting with 4 : 155
IDs starting with 5 : 154
IDs starting with 6 : 152
IDs starting with 7 : 172
IDs starting with 8 : 170
IDs starting with 9 : 166
IDs starting with A : 147
IDs starting with B : 161
IDs starting with C : 172
IDs starting with D : 158
IDs starting with E : 164
IDs starting with F : 164
IDs starting with G : 146
IDs starting with H : 156
IDs starting with I : 166
IDs starting with J : 172
IDs starting with K : 177
IDs starting with L : 143
IDs starting with M : 171
IDs starting with N : 144
IDs starting with O : 157
IDs starting with P : 162
IDs starting with Q : 144
IDs starting with R : 157
IDs starting with S : 161
IDs starting with T : 158
IDs starting with U : 174
IDs starting with V : 166
IDs starting with W : 166
IDs starting with X : 159
IDs starting with Y : 165
IDs starting with Z : 161
IDs starting with _ : 159
IDs starting with a : 145
IDs starting with b : 169
IDs starting with c : 166
IDs starting with d : 171
IDs starting with e : 162
IDs starting with f : 154
IDs starting with g : 132
IDs starting with h : 152
IDs starting with i : 136
IDs starting with j : 166
IDs starting with k : 159
IDs starting with l : 156
IDs starting with m : 154
IDs starting with n : 155
IDs starting with o : 154
IDs starting with p : 158
IDs starting with q : 141
IDs starting with r : 165
IDs starting with s : 154
IDs starting with t : 162
IDs starting with u : 146
IDs starting with v : 161
IDs starting with w : 164
IDs starting with x : 154
IDs starting with y : 164
IDs starting with z : 154
{code}

No IDs were generated with a first character of `-`, as expected. 

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15761) ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky

2023-10-30 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15761:
--

 Summary: 
ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector is flaky
 Key: KAFKA-15761
 URL: https://issues.apache.org/jira/browse/KAFKA-15761
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Calvin Liu


Build / JDK 21 and Scala 2.13 / testMultiWorkerRestartOnlyConnector – 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest
{code:java}
java.lang.AssertionError: Failed to stop connector and tasks within 12ms
at org.junit.Assert.fail(Assert.java:89)at 
org.junit.Assert.assertTrue(Assert.java:42)  at 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.runningConnectorAndTasksRestart(ConnectorRestartApiIntegrationTest.java:273)
 at 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector(ConnectorRestartApiIntegrationTest.java:231)
 at 
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)   at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
  at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
   at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)   
 at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)  at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)  at 
org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
   at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15760) org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated is flaky

2023-10-30 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15760:
--

 Summary: 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated
 is flaky
 Key: KAFKA-15760
 URL: https://issues.apache.org/jira/browse/KAFKA-15760
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Calvin Liu


Build / JDK 17 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – 
org.apache.kafka.trogdor.coordinator.CoordinatorTest
{code:java}
java.util.concurrent.TimeoutException: 
testTaskRequestWithOldStartMsGetsUpdated() timed out after 12 milliseconds  
 at 
org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
   at 
org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
  at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
   at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
  at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
   at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
   at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15759) DescribeClusterRequestTest is flaky

2023-10-30 Thread Calvin Liu (Jira)
Calvin Liu created KAFKA-15759:
--

 Summary: DescribeClusterRequestTest is flaky
 Key: KAFKA-15759
 URL: https://issues.apache.org/jira/browse/KAFKA-15759
 Project: Kafka
  Issue Type: Bug
  Components: unit tests
Reporter: Calvin Liu


testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
 – kafka.server.DescribeClusterRequestTest
{code:java}
org.opentest4j.AssertionFailedError: expected: 
 but was:  at 
org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)   at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)   at 
org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)   at 
org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)  at 
kafka.server.DescribeClusterRequestTest.$anonfun$testDescribeClusterRequest$4(DescribeClusterRequestTest.scala:99)
   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequest(DescribeClusterRequestTest.scala:86)
  at 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(DescribeClusterRequestTest.scala:53)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-30 Thread via GitHub


apoorvmittal10 commented on PR #14621:
URL: https://github.com/apache/kafka/pull/14621#issuecomment-1786178001

   > > @apoorvmittal10 : Yes, we are gradually moving the codebase to java. 
Ideally, all new classes should be written in java.
   > 
   > Thanks @junrao, I ll update the PR by tomorrow with new classes in Java.
   
   @junrao @hachikuji I have moved new classes to java, Can I please get a 
review on the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-30 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1376884298


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val clientSoftwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val clientSoftwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), clientSoftwareName, 
clientSoftwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress) // TODO: Fix Port
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, clientSoftwareName: 
String,
+clientSoftwareVersion: String, clientSourceAddress: String, 
clientSourcePort: String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, clientSoftwareName, 
clientSoftwareVersion, clientSourceAddress, clientSourcePort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case _: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()

Review Comment:
   Moved to java implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-30 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1376884030


##
core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * 
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * 
+ * {
+ * 
+ *  subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *  subscribedMetrics: List of metric prefixes
+ *  pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *  matchingPatternsList: List of client matching patterns, that are used 
by broker to match the client instance
+ * with the subscription.
+ * 
+ * }
+ * 
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * 
+ *  "metrics" value should be comma separated metrics list. A prefix match 
on the requested metrics
+ *  is performed in clients to determine subscribed metrics. An empty list 
means no metrics subscribed.
+ *  A list containing just an empty string means all metrics subscribed.
+ *  Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *
+ *  "interval.ms" should be between 100 and 360 (1 hour). This is the 
interval at which the client
+ *  should push the metrics to the broker.
+ *
+ *  "match" is a comma separated list of client match patterns, in case if 
there is no matching
+ *  pattern specified then broker considers that as all match which means 
the associated metrics
+ *  applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ *  which means all Java clients with any sub versions of 11.1 will be 
matched i.e. 11.1.1, 11.1.2 etc.
+ *
+ * 
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,

Review Comment:
   Moved to java implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense

2023-10-30 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781148#comment-17781148
 ] 

Justine Olshan commented on KAFKA-14402:


Sorry the wording is unclear. I will fix this. For older clients, the server 
will verify the partitions are in a given transaction. Older clients require no 
changes. If we changed them, they would be considered new (version) clients.

The idea of KIP-890 part 1 would be that there were no client changes required. 
The flow is the same from the client side. Just on the server side we verify 
the transaction is ongoing. Hopefully this makes sense.

> Transactions Server Side Defense
> 
>
> Key: KAFKA-14402
> URL: https://issues.apache.org/jira/browse/KAFKA-14402
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> We have seen hanging transactions in Kafka where the last stable offset (LSO) 
> does not update, we can’t clean the log (if the topic is compacted), and 
> read_committed consumers get stuck.
> This can happen when a message gets stuck or delayed due to networking issues 
> or a network partition, the transaction aborts, and then the delayed message 
> finally comes in. The delayed message case can also violate EOS if the 
> delayed message comes in after the next addPartitionsToTxn request comes in. 
> Effectively we may see a message from a previous (aborted) transaction become 
> part of the next transaction.
> Another way hanging transactions can occur is that a client is buggy and may 
> somehow try to write to a partition before it adds the partition to the 
> transaction. In both of these cases, we want the server to have some control 
> to prevent these incorrect records from being written and either causing 
> hanging transactions or violating Exactly once semantics (EOS) by including 
> records in the wrong transaction.
> The best way to avoid this issue is to:
>  # *Uniquely identify transactions by bumping the producer epoch after every 
> commit/abort marker. That way, each transaction can be identified by 
> (producer id, epoch).* 
>  # {*}Remove the addPartitionsToTxn call and implicitly just add partitions 
> to the transaction on the first produce request during a transaction{*}.
> We avoid the late arrival case because the transaction is uniquely identified 
> and fenced AND we avoid the buggy client case because we remove the need for 
> the client to explicitly add partitions to begin the transaction.
> Of course, 1 and 2 require client-side changes, so for older clients, those 
> approaches won’t apply.
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
>  
> See KIP-890 for more information: ** 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-30 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1376883781


##
core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics

Review Comment:
   Moved to java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-30 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1376883099


##
core/src/main/scala/kafka/server/ClientMetricsManager.scala:
##
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.clientmetrics.ClientMetricsConfig
+
+import java.util.Properties
+
+object ClientMetricsManager {
+  private val Instance = new ClientMetricsManager
+
+  def getInstance: ClientMetricsManager = Instance

Review Comment:
   Moved to java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-30 Thread via GitHub


apoorvmittal10 commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1376882728


##
core/src/main/scala/kafka/server/ClientMetricsManager.scala:
##
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.clientmetrics.ClientMetricsConfig
+
+import java.util.Properties
+
+object ClientMetricsManager {

Review Comment:
   Moved to java.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense

2023-10-30 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781144#comment-17781144
 ] 

Travis Bischel commented on KAFKA-14402:


Sorry, I may be a bit confused then: when it says "for old clients to verify 
the partitions are in a given transaction" -- this can only happen in v4. How 
can a client / an admin verify that a partition is a part of the transaction if 
v4 is meant to be broker<=>broker only?

> Transactions Server Side Defense
> 
>
> Key: KAFKA-14402
> URL: https://issues.apache.org/jira/browse/KAFKA-14402
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> We have seen hanging transactions in Kafka where the last stable offset (LSO) 
> does not update, we can’t clean the log (if the topic is compacted), and 
> read_committed consumers get stuck.
> This can happen when a message gets stuck or delayed due to networking issues 
> or a network partition, the transaction aborts, and then the delayed message 
> finally comes in. The delayed message case can also violate EOS if the 
> delayed message comes in after the next addPartitionsToTxn request comes in. 
> Effectively we may see a message from a previous (aborted) transaction become 
> part of the next transaction.
> Another way hanging transactions can occur is that a client is buggy and may 
> somehow try to write to a partition before it adds the partition to the 
> transaction. In both of these cases, we want the server to have some control 
> to prevent these incorrect records from being written and either causing 
> hanging transactions or violating Exactly once semantics (EOS) by including 
> records in the wrong transaction.
> The best way to avoid this issue is to:
>  # *Uniquely identify transactions by bumping the producer epoch after every 
> commit/abort marker. That way, each transaction can be identified by 
> (producer id, epoch).* 
>  # {*}Remove the addPartitionsToTxn call and implicitly just add partitions 
> to the transaction on the first produce request during a transaction{*}.
> We avoid the late arrival case because the transaction is uniquely identified 
> and fenced AND we avoid the buggy client case because we remove the need for 
> the client to explicitly add partitions to begin the transaction.
> Of course, 1 and 2 require client-side changes, so for older clients, those 
> approaches won’t apply.
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
>  
> See KIP-890 for more information: ** 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15758) Always schedule wrapped callbacks

2023-10-30 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-15758:
---
Summary: Always schedule wrapped callbacks   (was: Always schedule wrapped 
callbacks)

> Always schedule wrapped callbacks 
> --
>
> Key: KAFKA-15758
> URL: https://issues.apache.org/jira/browse/KAFKA-15758
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> As part of 
> [https://github.com/apache/kafka/commit/08aa33127a4254497456aa7a0c1646c7c38adf81]
>  the finding of the coordinator was moved to the AddPartitionsToTxnManager. 
> In the case of an error, we return the error on the wrapped callback. 
> This seemed to cause issues in the tests and we realized that executing the 
> callback directly and not rescheduling it on the request channel seemed to 
> resolve some issues. 
> One theory was that scheduling the callback before the request returned 
> caused issues.
> Ideally we wouldn't have this special handling. This ticket is to remove it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866891


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {
+  // If the callback is actually executed on the same request thread, 
we can directly execute

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-15758



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376866596


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   I understand the issue. I have a commit below that clears the request local.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781139#comment-17781139
 ] 

Philip Nee commented on KAFKA-4852:
---

cc [~luke.kirby] 

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
>  Labels: needs-kip
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-4852:
--
Labels: needs-kip serializers  (was: needs-kip)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
>  Labels: needs-kip, serializers
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15758) Always schedule wrapped callbacks

2023-10-30 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15758:
--

 Summary: Always schedule wrapped callbacks
 Key: KAFKA-15758
 URL: https://issues.apache.org/jira/browse/KAFKA-15758
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


As part of 
[https://github.com/apache/kafka/commit/08aa33127a4254497456aa7a0c1646c7c38adf81]
 the finding of the coordinator was moved to the AddPartitionsToTxnManager. In 
the case of an error, we return the error on the wrapped callback. 

This seemed to cause issues in the tests and we realized that executing the 
callback directly and not rescheduling it on the request channel seemed to 
resolve some issues. 

One theory was that scheduling the callback before the request returned caused 
issues.

Ideally we wouldn't have this special handling. This ticket is to remove it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15757) Do not advertise v4 AddPartitionsToTxn to clients

2023-10-30 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-15757:
--

 Summary: Do not advertise v4 AddPartitionsToTxn to clients
 Key: KAFKA-15757
 URL: https://issues.apache.org/jira/browse/KAFKA-15757
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan


v4+ is intended to be a broker side API. Thus, we should not return it as a 
valid version to clients.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense

2023-10-30 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781132#comment-17781132
 ] 

Justine Olshan commented on KAFKA-14402:


Hi [~twmb] 
The KIP current states:

> We will bump the versioning for AddPartitionsToTxn to add support for 
> "verifyOnly" mode used for old clients to verify the partitions are in a 
> given transaction. We will also add support to batch multiple waiting 
> requests by transactional ID. This newer version of the request will require 
> CLUSTER authorization so clients will not be able to use this version.

I'm not sure I understand your statement. Admins are not sending this request. 
The broker is sending the request. The goal is to remove the API from the 
client-side altogether.

> Transactions Server Side Defense
> 
>
> Key: KAFKA-14402
> URL: https://issues.apache.org/jira/browse/KAFKA-14402
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> We have seen hanging transactions in Kafka where the last stable offset (LSO) 
> does not update, we can’t clean the log (if the topic is compacted), and 
> read_committed consumers get stuck.
> This can happen when a message gets stuck or delayed due to networking issues 
> or a network partition, the transaction aborts, and then the delayed message 
> finally comes in. The delayed message case can also violate EOS if the 
> delayed message comes in after the next addPartitionsToTxn request comes in. 
> Effectively we may see a message from a previous (aborted) transaction become 
> part of the next transaction.
> Another way hanging transactions can occur is that a client is buggy and may 
> somehow try to write to a partition before it adds the partition to the 
> transaction. In both of these cases, we want the server to have some control 
> to prevent these incorrect records from being written and either causing 
> hanging transactions or violating Exactly once semantics (EOS) by including 
> records in the wrong transaction.
> The best way to avoid this issue is to:
>  # *Uniquely identify transactions by bumping the producer epoch after every 
> commit/abort marker. That way, each transaction can be identified by 
> (producer id, epoch).* 
>  # {*}Remove the addPartitionsToTxn call and implicitly just add partitions 
> to the transaction on the first produce request during a transaction{*}.
> We avoid the late arrival case because the transaction is uniquely identified 
> and fenced AND we avoid the buggy client case because we remove the need for 
> the client to explicitly add partitions to begin the transaction.
> Of course, 1 and 2 require client-side changes, so for older clients, those 
> approaches won’t apply.
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
>  
> See KIP-890 for more information: ** 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15756) Migrate existing integration tests to run old protocol in new coordinator

2023-10-30 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-15756:
---

 Summary: Migrate existing integration tests to run old protocol in 
new coordinator
 Key: KAFKA-15756
 URL: https://issues.apache.org/jira/browse/KAFKA-15756
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15756) Migrate existing integration tests to run old protocol in new coordinator

2023-10-30 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-15756:
---

Assignee: Dongnuo Lyu

> Migrate existing integration tests to run old protocol in new coordinator
> -
>
> Key: KAFKA-15756
> URL: https://issues.apache.org/jira/browse/KAFKA-15756
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376821250


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   There is a verification state for the doCommitTxnOffset path which is mostly 
the same path (execept for using storeOffsets instead of storeGroup)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


artemlivshits commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376817191


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   It seems that storing the group coordinator state is not transactional, so 
verification code path shouldn't be used.  But I agree with question of the 
locking model in the group coordinator and whether it'd work correctly with 
more asynchronous approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-30 Thread via GitHub


cmccabe commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376808429


##
metadata/src/main/resources/common/metadata/PartitionRecord.json:
##
@@ -47,6 +47,8 @@
   "about": "The eligible leader replicas of this partition." },
 { "name": "LastKnownELR", "type": "[]int32", "default": "null", 
"entityType": "brokerId",
   "versions": "1+", "nullableVersions": "1+", "taggedVersions": "1+", 
"tag": 2,
-  "about": "The last known eligible leader replicas of this partition." }
+  "about": "The last known eligible leader replicas of this partition." },
+{ "name": "Directories", "type": "[]uuid", "versions": "2+",
+  "about": "The log directory hosting each replica, sorted in the same 
exact order as the Replicas field."}

Review Comment:
   I would prefer to make it mandatory to make it clear that it's always 
present in version 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-30 Thread via GitHub


cmccabe commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376807529


##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -16,55 +16,166 @@
  */
 package org.apache.kafka.common;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-public class DirectoryId {
+public class DirectoryId extends Uuid {

Review Comment:
   I don't see why we should extend Uuid rather than just having a utility 
class that handles Uuids in the way that we want. Inheritance is almost always 
a mistake (unless it's of an interface)
   
   This seems to add a lot of boilerplate to / from code compared with just 
using Uuid directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-30 Thread via GitHub


cmccabe commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376805476


##
clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java:
##
@@ -19,7 +19,7 @@
 /**
  * An exception that may indicate the client's metadata is out of date
  */
-public abstract class InvalidMetadataException extends RetriableException {
+public class InvalidMetadataException extends RetriableException {

Review Comment:
   Hmm. It seems odd to make this non-abstract. Clearly the intention was to 
have a more specific class describe why the metadata was invalid. Why would we 
not follow this pattern here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14402) Transactions Server Side Defense

2023-10-30 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781122#comment-17781122
 ] 

Travis Bischel commented on KAFKA-14402:


Another addendum, after your update: v4 of the AddPartitionsToTxn API requires 
CLUSTER_ACTION to semi-enforce broker-to-broker requests. Can the KIP be 
updated to document this? Alternatively, can requiring CLUSTER_ACTION be 
dropped? The KIP itself indicates that admins will send AddPartitionsToTxn to 
see whether a partition is in the transaction, and admins do not need 
CLUSTER_ACTION. As well, as a client library -- if I send the request with one 
transaction (i.e. use v4 the same as I use v3), there's no reason I should be 
limited to v3.

> Transactions Server Side Defense
> 
>
> Key: KAFKA-14402
> URL: https://issues.apache.org/jira/browse/KAFKA-14402
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.5.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> We have seen hanging transactions in Kafka where the last stable offset (LSO) 
> does not update, we can’t clean the log (if the topic is compacted), and 
> read_committed consumers get stuck.
> This can happen when a message gets stuck or delayed due to networking issues 
> or a network partition, the transaction aborts, and then the delayed message 
> finally comes in. The delayed message case can also violate EOS if the 
> delayed message comes in after the next addPartitionsToTxn request comes in. 
> Effectively we may see a message from a previous (aborted) transaction become 
> part of the next transaction.
> Another way hanging transactions can occur is that a client is buggy and may 
> somehow try to write to a partition before it adds the partition to the 
> transaction. In both of these cases, we want the server to have some control 
> to prevent these incorrect records from being written and either causing 
> hanging transactions or violating Exactly once semantics (EOS) by including 
> records in the wrong transaction.
> The best way to avoid this issue is to:
>  # *Uniquely identify transactions by bumping the producer epoch after every 
> commit/abort marker. That way, each transaction can be identified by 
> (producer id, epoch).* 
>  # {*}Remove the addPartitionsToTxn call and implicitly just add partitions 
> to the transaction on the first produce request during a transaction{*}.
> We avoid the late arrival case because the transaction is uniquely identified 
> and fenced AND we avoid the buggy client case because we remove the need for 
> the client to explicitly add partitions to begin the transaction.
> Of course, 1 and 2 require client-side changes, so for older clients, those 
> approaches won’t apply.
> 3. *To cover older clients, we will ensure a transaction is ongoing before we 
> write to a transaction. We can do this by querying the transaction 
> coordinator and caching the result.*
>  
> See KIP-890 for more information: ** 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-30 Thread via GitHub


soarez commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376787884


##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -310,11 +337,22 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int 
partitionId, short versio
 setLeaderRecoveryState(leaderRecoveryState.value()).
 setLeaderEpoch(leaderEpoch).
 setPartitionEpoch(partitionEpoch);
-if (version > 0) {
+if (options.metadataVersion().isElrSupported()) {
 record.setEligibleLeaderReplicas(Replicas.toList(elr)).
 setLastKnownELR(Replicas.toList(lastKnownElr));
 }
-return new ApiMessageAndVersion(record, version);
+if (options.metadataVersion().isDirectoryAssignmentSupported()) {
+record.setDirectories(DirectoryId.toList(directories));
+} else {
+for (int i = 0; i < directories.length; i++) {

Review Comment:
   You're right. I've updated the constructor so that `directories` can never 
be null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-30 Thread via GitHub


soarez commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376787569


##
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java:
##
@@ -49,12 +51,15 @@
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.when;
 
 
 @Timeout(value = 40)
 public class PartitionChangeBuilderTest {
 private static Stream partitionChangeRecordVersions() {
-return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, 
PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1).mapToObj(version -> 
Arguments.of((short) version));
+return IntStream.range(PartitionChangeRecord.LOWEST_SUPPORTED_VERSION, 
PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION + 1)
+.filter(v -> v != 
PartitionChangeRecord.HIGHEST_SUPPORTED_VERSION) // TODO test latest record 
version in KAFKA-15514

Review Comment:
   Good catch, I forgot this. Removed and fixed failing tests.



##
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java:
##
@@ -122,8 +132,13 @@ public PartitionRegistration build() {
 throw new IllegalStateException("You must set last known 
elr.");
 }
 
+if (directories == null) {
+directories = DirectoryId.unassignedArray(replicas.length);
+}

Review Comment:
   Good point. I've moved this to the underlying constructor to deal with both 
cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15355: Message schema changes [kafka]

2023-10-30 Thread via GitHub


soarez commented on code in PR #14290:
URL: https://github.com/apache/kafka/pull/14290#discussion_r1376787303


##
clients/src/main/java/org/apache/kafka/common/errors/InvalidMetadataException.java:
##
@@ -19,7 +19,7 @@
 /**
  * An exception that may indicate the client's metadata is out of date
  */
-public abstract class InvalidMetadataException extends RetriableException {
+public class InvalidMetadataException extends RetriableException {

Review Comment:
   I had made this change to deal with a partition record with different 
lengths for `replicas` and `directories`. I've now realized that check hadn't 
been pushed.
   
   If we need to keep this abstract for some reason we can create a new 
Exception type instead. 



##
server-common/src/main/java/org/apache/kafka/common/DirectoryId.java:
##
@@ -16,55 +16,166 @@
  */
 package org.apache.kafka.common;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-public class DirectoryId {
+public class DirectoryId extends Uuid {
 
 /**
- * A UUID that is used to identify new or unknown dir assignments.
+ * A DirectoryId that is used to identify new or unknown dir assignments.
  */
-public static final Uuid UNASSIGNED = new Uuid(0L, 0L);
+public static final DirectoryId UNASSIGNED = new DirectoryId(0L, 0L);
 
 /**
- * A UUID that is used to represent unspecified offline dirs.
+ * A DirectoryId that is used to represent unspecified offline dirs.
  */
-public static final Uuid LOST = new Uuid(0L, 1L);
+public static final DirectoryId LOST = new DirectoryId(0L, 1L);
 
 /**
- * A UUID that is used to represent and unspecified log directory,
+ * A DirectoryId that is used to represent and unspecified log directory,
  * that is expected to have been previously selected to host an
  * associated replica. This contrasts with {@code UNASSIGNED_DIR},
  * which is associated with (typically new) replicas that may not
  * yet have been placed in any log directory.
  */
-public static final Uuid MIGRATING = new Uuid(0L, 2L);
+public static final DirectoryId MIGRATING = new DirectoryId(0L, 2L);
 
 /**
  * The set of reserved UUIDs that will never be returned by the random 
method.
  */
-public static final Set RESERVED;
+public static final Set RESERVED;
 
 static {
-HashSet reserved = new HashSet<>(Uuid.RESERVED);
-// The first 100 UUIDs are reserved for future use.
+HashSet reserved = new HashSet<>();
+// The first 100 DirectoryIds are reserved for future use.
 for (long i = 0L; i < 100L; i++) {
-reserved.add(new Uuid(0L, i));
+reserved.add(new DirectoryId(0L, i));
 }
 RESERVED = Collections.unmodifiableSet(reserved);
 }
 
+/**
+ * Constructs a Directory ID from the underlying 128 bits,
+ * exactly as a {@link Uuid} is constructed.
+ */
+private DirectoryId(long mostSigBits, long leastSigBits) {
+super(mostSigBits, leastSigBits);
+}
+
+/**
+ * Creates a DirectoryId based on a base64 string encoding used in the 
toString() method.
+ */
+public static DirectoryId fromString(String str) {
+return DirectoryId.fromUuid(Uuid.fromString(str));
+}
+
+/**
+ * Creates a DirectoryId based on a {@link Uuid}.
+ */
+public static DirectoryId fromUuid(Uuid uuid) {
+return new DirectoryId(uuid.getMostSignificantBits(), 
uuid.getLeastSignificantBits());
+}

Review Comment:
   I've added a test to verify the reserved values, using MSB/LSB as somewhat 
less indirect test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-30 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1376784755


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java:
##
@@ -102,4 +140,230 @@ static class RawAndDeserializedValue {
 this.value = value;
 }
 }
+
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final QueryConfig config) {
+
+final long start = time.nanoseconds();
+final QueryResult result;
+
+final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, config);
+if (config.isCollectExecutionInfo()) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + 
(time.nanoseconds() - start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+config,
+this
+);
+if (config.isCollectExecutionInfo()) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (time.nanoseconds() - 
start) + "ns");
+}
+}
+return result;
+}
+
+
+
+@SuppressWarnings("unchecked")
+protected  QueryResult runTimestampKeyQuery(final Query query,
+  final PositionBound 
positionBound,
+  final QueryConfig 
config) {
+final QueryResult result;
+final TimestampedKeyQuery typedKeyQuery = 
(TimestampedKeyQuery) query;
+final KeyQuery rawKeyQuery =
+KeyQuery.withKey(keyBytes(typedKeyQuery.getKey()));
+final QueryResult rawResult =
+wrapped().query(rawKeyQuery, positionBound, config);
+if (rawResult.isSuccess()) {
+final Function> deserializer = 
getDeserializeValue(serdes, wrapped());
+final ValueAndTimestamp value = 
deserializer.apply(rawResult.getResult());
+final QueryResult> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, value);
+result = (QueryResult) typedQueryResult;
+} else {
+// the generic type doesn't matter, since failed queries have no 
result set.
+result = (QueryResult) rawResult;
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+protected  QueryResult runTimestampRangeQuery(final Query query,
+final PositionBound 
positionBound,
+final QueryConfig 
config) {
+
+final QueryResult result;
+final TimestampedRangeQuery typedQuery = 
(TimestampedRangeQuery) query;
+
+final RangeQuery rawRangeQuery;
+if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+rawRangeQuery = RangeQuery.withRange(
+keyBytes(typedQuery.getLowerBound().get()),
+keyBytes(typedQuery.getUpperBound().get())
+);
+} else if (typedQuery.getLowerBound().isPresent()) {
+rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+} else if (typedQuery.getUpperBound().isPresent()) {
+rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+} else {
+rawRangeQuery = RangeQuery.withNoBounds();
+}
+final QueryResult> rawResult =
+wrapped().query(rawRangeQuery, positionBound, config);
+if (rawResult.isSuccess()) {
+final KeyValueIterator iterator = 
rawResult.getResult();
+final KeyValueIterator resultIterator = new 
MeteredKeyValueTimestampedIterator(
+iterator,
+getSensor,
+getDeserializeValue(serdes, wrapped()),
+false
+);
+final QueryResult> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+rawResult,
+resultIterator
+);
+result = (QueryResult) typedQueryResult;
+} else {
+// the generic type doesn't matter, since failed queries have no 
result set.
+result = (QueryResult) rawResult;
+}
+return result;
+}
+
+

[jira] [Resolved] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15602.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
 Assignee: Matthias J. Sax
   Resolution: Fixed

As discussed, reverted this in all applicable branches.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position 

Re: [PR] KAFKA-15277: ConsumerDelegate support [kafka]

2023-10-30 Thread via GitHub


kirktrue closed pull request #14664: KAFKA-15277: ConsumerDelegate support
URL: https://github.com/apache/kafka/pull/14664


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4852:

  Assignee: (was: LinShunkang)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4852:
---
Labels: needs-kip  (was: )

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
>  Labels: needs-kip
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15277: ConsumerDelegate support [kafka]

2023-10-30 Thread via GitHub


kirktrue commented on PR #14664:
URL: https://github.com/apache/kafka/pull/14664#issuecomment-1785968980

   This was a simple experiment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15277: Design & implement support for internal Consumer delegates [kafka]

2023-10-30 Thread via GitHub


kirktrue opened a new pull request, #14670:
URL: https://github.com/apache/kafka/pull/14670

   The consumer refactoring project introduced another `Consumer` 
implementation, creating two different, coexisting implementations of the 
`Consumer` interface:
   
   * `KafkaConsumer` (AKA "existing", "legacy" consumer)
   * `PrototypeAsyncConsumer` (AKA "new", "refactored" consumer)
   
   The goal of this task is to refactor the code via the delegation pattern so 
that we can keep a top-level `KafkaConsumer` but then delegate to another 
implementation under the covers. There will be two delegates at first:
   
   * `LegacyKafkaConsumer`
   * `AsyncKafkaConsumer`
   
   `LegacyKafkaConsumer` is essentially a renamed `KafkaConsumer`. That 
implementation handles the existing group protocol. `AsyncKafkaConsumer` is 
renamed from `PrototypeAsyncConsumer` and will implement the new consumer group 
protocol from KIP-848. Both of those implementations will live in the 
`internals` sub-package to discourage their use.
   
   This task is part of the work to implement support for the new KIP-848 
consumer group protocol.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4852:
---
Fix Version/s: (was: 3.4.0)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15277) Design & implement support for internal Consumer delegates

2023-10-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15277:
--
Description: 
The consumer refactoring project introduced another {{Consumer}} 
implementation, creating two different, coexisting implementations of the 
{{Consumer}} interface:
 * {{KafkaConsumer}} (AKA "existing", "legacy" consumer)
 * {{PrototypeAsyncConsumer}} (AKA "new", "refactored" consumer)

The goal of this task is to refactor the code via the delegation pattern so 
that we can keep a top-level {{KafkaConsumer}} but then delegate to another 
implementation under the covers. There will be two delegates at first:
 * {{LegacyKafkaConsumer}}
 * {{AsyncKafkaConsumer}}

{{LegacyKafkaConsumer}} essentially be a renamed {{{}KafkaConsumer{}}}. That 
implementation handles the existing group protocol. {{AsyncKafkaConsumer}} is 
renamed from {{PrototypeAsyncConsumer}} and will implement the new consumer 
group protocol from KIP-848. Both of those implementations will live in the 
"internals" sub-package to discourage their use.

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.

  was:
As mentioned above, there are presently two different, coexisting 
implementations of the {{Consumer}} interface: {{KafkaConsumer}} ("old") and 
{{PrototypeAsyncConsumer}} ("new"). Eventually, these will be reorganized using 
the delegation pattern. The top-level {{KafkaConsumer}} that implements the old 
protocol will be renamed as {{LegacyKafkaConsumerDelegate}} and 
{{PrototypeAsyncConsumer}} will be renamed as 
{{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}}. It is assumed that neither 
{{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}} nor 
{{{}LegacyKafkaConsume{}}}{{{}rDelegate{}}} will be top-level implementations 
of {{{}Consumer{}}}, but will likely implement an internal interface that is 
better suited to the needs of the top-level {{{}KafkaConsumer{}}}.

Provide the Java client support for the consumer delegates, including:
 * Create {{ConsumerDelegate}} interface
 * Clone {{{}KafkaConsumer{}}}, rename as {{LegacyKafkaConsumerDelegate}} and 
refactor to implement {{ConsumerDelegate}}
 * Rename {{PrototypeAsyncConsumer}} to {{AsyncKafkaConsumerDelegate}} and 
refactor to implement the {{ConsumerDelegate}} interface
 * Refactor the (original) {{KafkaConsumer}} to remove the core implementation, 
instead delegating to the {{{}ConsumerDelegate{}}}, which will be hard-coded to 
use {{LegacyKafkaConsumerDelegate}}
 * Once available (in KAFKA-15284), use the 
{{ConsumerGroupProtocolVersionResolver}} to determine which delegate to use

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.


> Design & implement support for internal Consumer delegates
> --
>
> Key: KAFKA-15277
> URL: https://issues.apache.org/jira/browse/KAFKA-15277
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848, kip-848-e2e, 
> kip-848-preview
>
> The consumer refactoring project introduced another {{Consumer}} 
> implementation, creating two different, coexisting implementations of the 
> {{Consumer}} interface:
>  * {{KafkaConsumer}} (AKA "existing", "legacy" consumer)
>  * {{PrototypeAsyncConsumer}} (AKA "new", "refactored" consumer)
> The goal of this task is to refactor the code via the delegation pattern so 
> that we can keep a top-level {{KafkaConsumer}} but then delegate to another 
> implementation under the covers. There will be two delegates at first:
>  * {{LegacyKafkaConsumer}}
>  * {{AsyncKafkaConsumer}}
> {{LegacyKafkaConsumer}} essentially be a renamed {{{}KafkaConsumer{}}}. That 
> implementation handles the existing group protocol. {{AsyncKafkaConsumer}} is 
> renamed from {{PrototypeAsyncConsumer}} and will implement the new consumer 
> group protocol from KIP-848. Both of those implementations will live in the 
> "internals" sub-package to discourage their use.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15602: revert KAFKA-4852 [kafka]

2023-10-30 Thread via GitHub


mjsax merged PR #14617:
URL: https://github.com/apache/kafka/pull/14617


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15643) Improve unloading logging

2023-10-30 Thread Ritika Muduganti (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ritika Muduganti resolved KAFKA-15643.
--
  Reviewer: David Jacot
Resolution: Fixed

> Improve unloading logging
> -
>
> Key: KAFKA-15643
> URL: https://issues.apache.org/jira/browse/KAFKA-15643
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Ritika Muduganti
>Priority: Major
>
> When a new leader is elected for a __consumer_offset partition, the followers 
> are notified to unload the state. However, only the former leader is aware of 
> it. The remaining follower prints out the following error:
> ERROR [GroupCoordinator id=1] Execution of 
> UnloadCoordinator(tp=__consumer_offsets-1, epoch=0) failed due to This is not 
> the correct coordinator.. 
> (org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime)
> The error is actually correct but we should improve the logging to not print 
> anything when in the remaining follower case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15284) Implement ConsumerGroupProtocolVersionResolver to determine consumer group protocol

2023-10-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15284:
-

Assignee: Kirk True

> Implement ConsumerGroupProtocolVersionResolver to determine consumer group 
> protocol
> ---
>
> Key: KAFKA-15284
> URL: https://issues.apache.org/jira/browse/KAFKA-15284
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
>
> At client initialization, we need to determine which of the 
> {{ConsumerDelegate}} implementations to use:
>  # {{LegacyKafkaConsumerDelegate}}
>  # {{AsyncKafkaConsumerDelegate}}
> There are conditions defined by KIP-848 that determine client eligibility to 
> use the new protocol. This will be modeled by the—deep 
> breath—{{{}ConsumerGroupProtocolVersionResolver{}}}.
> Known tasks:
>  * Determine at what point in the {{Consumer}} initialization the network 
> communication should happen
>  * Determine what RPCs to invoke in order to determine eligibility (API 
> versions, IBP version, etc.)
>  * Implement the network client lifecycle (startup, communication, shutdown, 
> etc.)
>  * Determine the fallback path in case the client is not eligible to use the 
> protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15277) Design & implement support for internal Consumer delegates

2023-10-30 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15277:
-

Assignee: Kirk True  (was: Philip Nee)

> Design & implement support for internal Consumer delegates
> --
>
> Key: KAFKA-15277
> URL: https://issues.apache.org/jira/browse/KAFKA-15277
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848, kip-848-e2e, 
> kip-848-preview
>
> As mentioned above, there are presently two different, coexisting 
> implementations of the {{Consumer}} interface: {{KafkaConsumer}} ("old") and 
> {{PrototypeAsyncConsumer}} ("new"). Eventually, these will be reorganized 
> using the delegation pattern. The top-level {{KafkaConsumer}} that implements 
> the old protocol will be renamed as {{LegacyKafkaConsumerDelegate}} and 
> {{PrototypeAsyncConsumer}} will be renamed as 
> {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}}. It is assumed that neither 
> {{{}AsyncKafkaConsume{}}}{{{}rDelegate{}}} nor 
> {{{}LegacyKafkaConsume{}}}{{{}rDelegate{}}} will be top-level implementations 
> of {{{}Consumer{}}}, but will likely implement an internal interface that is 
> better suited to the needs of the top-level {{{}KafkaConsumer{}}}.
> Provide the Java client support for the consumer delegates, including:
>  * Create {{ConsumerDelegate}} interface
>  * Clone {{{}KafkaConsumer{}}}, rename as {{LegacyKafkaConsumerDelegate}} and 
> refactor to implement {{ConsumerDelegate}}
>  * Rename {{PrototypeAsyncConsumer}} to {{AsyncKafkaConsumerDelegate}} and 
> refactor to implement the {{ConsumerDelegate}} interface
>  * Refactor the (original) {{KafkaConsumer}} to remove the core 
> implementation, instead delegating to the {{{}ConsumerDelegate{}}}, which 
> will be hard-coded to use {{LegacyKafkaConsumerDelegate}}
>  * Once available (in KAFKA-15284), use the 
> {{ConsumerGroupProtocolVersionResolver}} to determine which delegate to use
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15679: Consumer configurations for group protocol [kafka]

2023-10-30 Thread via GitHub


dajac commented on code in PR #14642:
URL: https://github.com/apache/kafka/pull/14642#discussion_r1376703936


##
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##
@@ -105,6 +105,24 @@ public class ConsumerConfig extends AbstractConfig {
 public static final String HEARTBEAT_INTERVAL_MS_CONFIG = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG;
 private static final String HEARTBEAT_INTERVAL_MS_DOC = 
CommonClientConfigs.HEARTBEAT_INTERVAL_MS_DOC;
 
+/**
+ * group.protocol
+ */
+public static final String GROUP_PROTOCOL_CONFIG = "group.protocol";
+public static final String DEFAULT_GROUP_PROTOCOL = 
GroupProtocol.GENERIC.name().toLowerCase(Locale.ROOT);
+public static final String GROUP_PROTOCOL_DOC = "The group protocol 
consumer should use.  We currently " +

Review Comment:
   nit: Let's remove the double spaces after the dots.



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() {
 final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
 assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
+
+@Test
+public void testDefaultConsumerGroupConfig() {
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(null, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@Test
+public void testValidConsumerGroupConfig() {
+String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
+configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, 
"org.apache.kafka.clients.group.someAssignor");

Review Comment:
   nit: I suppose that we could reuse `remoteAssignorName` here?



##
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java:
##
@@ -158,4 +158,27 @@ public void testCaseInsensitiveSecurityProtocol() {
 final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
 assertEquals(saslSslLowerCase, 
consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
 }
+
+@Test
+public void testDefaultConsumerGroupConfig() {
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("generic", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(null, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}
+
+@Test
+public void testValidConsumerGroupConfig() {
+String remoteAssignorName = 
"org.apache.kafka.clients.group.someAssignor";
+final Map configs = new HashMap<>();
+configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer");
+configs.put(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG, 
"org.apache.kafka.clients.group.someAssignor");
+final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+assertEquals("consumer", 
consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG));
+assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.REMOTE_ASSIGNOR_CONFIG));
+}

Review Comment:
   Should we add a test to ensure that `GROUP_PROTOCOL_CONFIG` can only accept 
`consumer` and `generic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-30 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781095#comment-17781095
 ] 

ASF GitHub Bot commented on KAFKA-15653:


jolshan merged PR #564:
URL: https://github.com/apache/kafka-site/pull/564




> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Justine Olshan
>Priority: Major
> Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15647: Fix the different behavior in error handling between the old and new group coordinator [kafka]

2023-10-30 Thread via GitHub


dajac commented on code in PR #14589:
URL: https://github.com/apache/kafka/pull/14589#discussion_r1376679603


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1500,6 +1502,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 new OffsetFetchResponseData.OffsetFetchResponseGroup()
   .setGroupId(offsetFetchRequest.groupId)
   .setErrorCode(Errors.forException(exception).code)
+  } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) {

Review Comment:
   ditto.



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1461,6 +1461,8 @@ class KafkaApis(val requestChannel: RequestChannel,
 new OffsetFetchResponseData.OffsetFetchResponseGroup()
   .setGroupId(offsetFetchRequest.groupId)
   .setErrorCode(Errors.forException(exception).code)
+  } else if (offsetFetchResponse.errorCode() != Errors.NONE.code()) {

Review Comment:
   nit: We could remove the `()` after `code`.



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -4701,6 +4751,89 @@ class KafkaApisTest {
 assertEquals(expectedOffsetFetchResponse, response.data)
   }
 
+  @Test
+  def testHandleOffsetFetchWithUnauthorizedTopicAndTopLevelError(): Unit = {
+def makeRequest(version: Short): RequestChannel.Request = {
+  val groups = Map(
+"group-1" -> List(
+  new TopicPartition("foo", 0),
+  new TopicPartition("bar", 0)
+).asJava,
+"group-2" -> List(
+  new TopicPartition("foo", 0),
+  new TopicPartition("bar", 0)
+).asJava
+  ).asJava
+  buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+}
+
+val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+val authorizer: Authorizer = mock(classOf[Authorizer])
+
+val acls = Map(
+  "group-1" -> AuthorizationResult.ALLOWED,
+  "group-2" -> AuthorizationResult.ALLOWED,
+  "foo" -> AuthorizationResult.DENIED,
+  "bar" -> AuthorizationResult.ALLOWED
+)
+
+when(authorizer.authorize(
+  any[RequestContext],
+  any[util.List[Action]]
+)).thenAnswer { invocation =>
+  val actions = invocation.getArgument(1, classOf[util.List[Action]])
+  actions.asScala.map { action =>
+acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+  }.asJava
+}
+
+// group-1 and group-2 are allowed and bar is allowed.
+val group1Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+when(groupCoordinator.fetchOffsets(
+  requestChannelRequest.context,
+  new OffsetFetchRequestData.OffsetFetchRequestGroup()
+.setGroupId("group-1")
+.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+  .setName("bar")
+  .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+  false
+)).thenReturn(group1Future)
+
+val group2Future = new 
CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]()
+when(groupCoordinator.fetchOffsets(
+  requestChannelRequest.context,
+  new OffsetFetchRequestData.OffsetFetchRequestGroup()
+.setGroupId("group-2")
+.setTopics(List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+  .setName("bar")
+  .setPartitionIndexes(List[Integer](0).asJava)).asJava),
+  false
+)).thenReturn(group1Future)
+
+createKafkaApis(authorizer = 
Some(authorizer)).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+// group-2 mocks using the new group coordinator.
+// When the coordinator is not active, a response with error code is 
returned.

Review Comment:
   Should we add a note about `foo` here? The whole point of this test is to 
ensure that the failed topics are not present in the response when there is a 
top level error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376679750


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   I am saying that before the append was under the lock. With the verification 
change, now it is not. I think we are saying the same thing here. We need to 
figure out if the append not under the lock is safe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376678541


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   I we reduce then I assume the thread just stops?



##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   If we reduce then I assume the thread just stops?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-30 Thread via GitHub


junrao closed pull request #14603: KAFKA-15582: Move the clean shutdown file to 
the storage package
URL: https://github.com/apache/kafka/pull/14603


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-30 Thread via GitHub


junrao commented on PR #14603:
URL: https://github.com/apache/kafka/pull/14603#issuecomment-1785851656

   @CalvinConfluent : 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15628) Refactor ConsumerRebalanceListener invocation for reuse

2023-10-30 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-15628.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Refactor ConsumerRebalanceListener invocation for reuse
> ---
>
> Key: KAFKA-15628
> URL: https://issues.apache.org/jira/browse/KAFKA-15628
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
> Fix For: 3.7.0
>
>
> Pull out the code related to invoking {{ConsumerRebalanceListener}} methods 
> into its own class so that it can be reused by the KIP-848 implementation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-30 Thread via GitHub


dajac merged PR #14638:
URL: https://github.com/apache/kafka/pull/14638


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376674826


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   > It would just be the append itself that is not under the lock.
   
   Hmm, is that true? We have the following code.
   
   ```
   GroupCoordinator.doSyncGroup
   group.inLock {
   ...
 groupManager.storeGroup
   ... 
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376671529


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   Not sure how strongly we need the `GroupMetadata` lock while appending to 
the log. But I think the intention is to make sure that all writes to the log 
for a group are serialized. Updating the in-memory state typically happens in 
the callback when the HWM advances and is also protected by the `GroupMetadata` 
lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376670972


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   It seems that we do have the lock for the in-memory state. It would just be 
the append itself that is not under the lock.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376662251


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   >  I am wondering about the locking model. We have the following path that's 
called under the GroupMetadata lock. GroupCoordinator.doSyncGroup (hold 
GroupMetadata lock) => groupManager.storeGroup => appendForGroup => 
replicaManager.appendRecords => replicaManager.appendEntries. However, if we 
register the callback, replicaManager.appendEntries will be called without 
holding the GroupMetadata lock. Is that safe?
   
   Hmm. Do we need the lock to write to log or just to updaet the in-memory 
state. I can take another look at this path. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376661650


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   True in the common case. I was wondering what happens if we dynamically 
reduce the number of request handler threads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376659689


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Hmm. We assign the request channel when the thread is created. I assume that 
request channel will remain with the thread during its lifetime. Is that 
incorrect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376657322


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -53,25 +53,27 @@ object KafkaRequestHandler {
* Wrap callback to schedule it on a request thread.
* NOTE: this function must be called on a request thread.
* @param fun Callback function to execute
+   * @param requestLocal The RequestLocal for the current request handler 
thread in case we need to call
+   * the callback function without queueing the callback 
request
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
-if (threadCurrentRequest.get() != null) {
-  // If the callback is actually executed on a request thread, we can 
directly execute
+if (threadCurrentRequest.get() == currentRequest) {

Review Comment:
   Thanks, Justine and Artem. Yes, passing around the context has it's own 
issues. The main thing with thread local is to make sure that we don't 
introduce any GC issues. Currently, it seems that we never remove 
`threadRequestChannel`. Since we allow dynamically changing the number of 
request handler threads, it's probably better to remove `threadRequestChannel` 
when the thread completes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


divijvaidya commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376654240


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   >  actionable practical recommendation
   
   I can't think of an idea other than passing deep copy of arguments. I have 
also considered suggesting wrapping in Collections.unmodifiable or 
Collections.synchronizedCollection but none of these ideas are good enough. 
   
   We need to re-think callback invocation pattern in Kafka and see if we can 
avoid this pattern of passing reference of in-memory state around threads. But 
I am happy to keep it our of this PR and discuss that in scope of JIRA that 
Ismael created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-30 Thread via GitHub


junrao commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1376651533


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -864,6 +778,111 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
+  /*
+   * Note: This method can be used as a callback in a different request 
thread. Ensure that correct RequestLocal
+   * is passed when executing this method. Accessing non-thread-safe data 
structures should be avoided if possible.
+   */
+  private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords],
+internalTopicsAllowed: Boolean,
+origin: AppendOrigin,
+requiredAcks: Short,
+verificationGuards: Map[TopicPartition, 
VerificationGuard],

Review Comment:
   1. On the server side, some of the callbacks (Purgatory, the one introduced 
in KAFKA-14561, etc) are called in a different thread than the caller. I am not 
sure if it's possible/desirable to change all of those callbacks to be run on 
the same thread. Given that model, I agree that the wide exposure of 
ThreadLocal seems potentially dangerous. I pinged 
https://github.com/apache/kafka/pull/9220 to see if we could consider an 
alternative approach. It might be easier to fix the `ThreadLocal` thing first 
before fixing this issue.
   2. In general, callbacks on different threads are tricky to get right. For 
example, we spend a lot of time to fix the deadlock issues related to Purgatory 
(https://issues.apache.org/jira/browse/KAFKA-8334). For this newly introduced 
callback, I am wondering about the locking model. We have the following path 
that's called under the GroupMetadata lock. `GroupCoordinator.doSyncGroup (hold 
GroupMetadata lock) => groupManager.storeGroup => appendForGroup => 
replicaManager.appendRecords => replicaManager.appendEntries`. However, if we 
register the callback, `replicaManager.appendEntries` will be called without 
holding the `GroupMetadata` lock. Is that safe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2023-10-30 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781075#comment-17781075
 ] 

Philip Nee commented on KAFKA-15631:


Hi [~lianetm] Thanks for filing this issue.  I believe the scenario should have 
been covered by the current tests, i.e. "testNetworkTimeout" and 
"testHeartbeatOnStartup" should already test the inflight heartbeat request.  
I'm closing this issue for now.  If this is indeed an issue, we can reopen this.

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15578: Migrating other system tests to use the group coordinator [kafka]

2023-10-30 Thread via GitHub


rreddy-22 commented on PR #14582:
URL: https://github.com/apache/kafka/pull/14582#issuecomment-1785758202

   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5915/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15631) Do not send new heartbeat request while another one in-flight

2023-10-30 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15631.

Resolution: Not A Problem

> Do not send new heartbeat request while another one in-flight
> -
>
> Key: KAFKA-15631
> URL: https://issues.apache.org/jira/browse/KAFKA-15631
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> Client consumer should not send a new heartbeat request while there is a 
> previous in-flight. If a HB is in-flight, we should wait for a response or 
> timeout before sending a next one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15643: Fix error logged when unload is called on a broker that was never a coordinator. [kafka]

2023-10-30 Thread via GitHub


rreddy-22 commented on code in PR #14657:
URL: https://github.com/apache/kafka/pull/14657#discussion_r1376601434


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##
@@ -1359,7 +1359,8 @@ public void scheduleUnloadOperation(
 log.info("Scheduling unloading of metadata for {} with epoch {}", tp, 
partitionEpoch);

Review Comment:
   What should we rephrase it as? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15754) The kafka-storage tool can generate UUID starting with "-"

2023-10-30 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781069#comment-17781069
 ] 

Ismael Juma commented on KAFKA-15754:
-

[~jolshan] I think this issue can happen if some code does _not_ use 
`Uuid.toString()` and instead uses Java's `UUID.toString()` somehow.

> The kafka-storage tool can generate UUID starting with "-"
> --
>
> Key: KAFKA-15754
> URL: https://issues.apache.org/jira/browse/KAFKA-15754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0
>Reporter: Paolo Patierno
>Assignee: Paolo Patierno
>Priority: Major
>
> Using the kafka-storage.sh tool, it seems that it can still generate a UUID 
> starting with a dash "-", which then breaks how the argparse4j library works. 
> With such an UUID (i.e. -rmdB0m4T4–Y4thlNXk4Q in my case) the tool exits with 
> the following error:
> kafka-storage: error: argument --cluster-id/-t: expected one argument
> Said that, it seems that this problem was already addressed in the 
> Uuid.randomUuid method which keeps generating a new UUID until it doesn't 
> start with "-". This is the commit addressing it 
> [https://github.com/apache/kafka/commit/5c1dd493d6f608b566fdad5ab3a896cb13622bce]
> The problem is that when the toString is called on the Uuid instance, it's 
> going to do a Base64 encoding on the generated UUID this way:
> {code:java}
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid()); 
> {code}
> Not sure why, but the code is using an URL (safe) encoder which, taking a 
> look at the Base64 class in Java, is using a RFC4648_URLSAFE encoder using 
> the following alphabet:
>  
> {code:java}
> private static final char[] toBase64URL = new char[]{'A', 'B', 'C', 'D', 'E', 
> 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 
> 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 
> 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 
> 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '-', '_'}; {code}
> which as you can see includes the "-" character.
> So despite the current Uuid.randomUuid is avoiding the generation of a UUID 
> containing a dash, the Base64 encoding operation can return a final UUID 
> starting with the dash instead.
>  
> I was wondering if there is any good reason for using a Base64 URL encoder 
> and not just the RFC4648 (not URL safe) which uses the common Base64 alphabet 
> not containing the "-".



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >