Re: [PR] KAFKA-15608: Assign lastet leader eopch and offset checkpoint to future log when replacing current log [kafka]

2023-10-24 Thread via GitHub


drawxy closed pull request #14553: KAFKA-15608: Assign lastet leader eopch and 
offset checkpoint to future log when replacing current log
URL: https://github.com/apache/kafka/pull/14553


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15608: Assign lastet leader eopch and offset checkpoint to future log when replacing current log [kafka]

2023-10-24 Thread via GitHub


drawxy commented on PR #14553:
URL: https://github.com/apache/kafka/pull/14553#issuecomment-1778559531

   > I tested this process on the latest trunk branch and it did not reproduce. 
My process is: create a topic with 1 partition, then write traffic and stop 
writing. For example, the current isr is [2,0,1], I will submit a reassign, and 
the isr is [2,0,1] , 2 of which logdir changes. Finally execute reassign. 
Please confirm whether my testing process is different from yours? Thanks 
@drawxy
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Kafka 15680 [kafka]

2023-10-24 Thread via GitHub


kumarpritam863 opened a new pull request, #14630:
URL: https://github.com/apache/kafka/pull/14630

   * In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, 
say Worker 3 joins, a new global assignment is computed by the leader, say 
Worker1, that results in the revocation of one task from each existing worker 
i.e Worker1 and Worker2.
   * Once the new member join is completed, 
ConsumerCoordinator.OnJoinComplete() method is called which primarily computes 
all the new partitions assigned and the partitions which are revoked and 
updates the subscription Object.
   * If it was the case of revocation which we check by checking the 
“partitonsRevoked” list, we call the method “invokePartitionRevoked()” which 
internally calls “updatePartitionCount()” which fetches partition from the 
assignment object which is yet not updated by the new assignment.
   * It is only just before calling the “invokePartitionsAssigned()” method 
that we update the assignment by invoking the following → 
subscriptions.assignFromSubscribed(assignedPartitions);
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15680) Partition-Count is not getting updated Correctly in the Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing

2023-10-24 Thread Pritam Kumar (Jira)
Pritam Kumar created KAFKA-15680:


 Summary: Partition-Count is not getting updated Correctly in the 
Incremental Co-operative Rebalancing(ICR) Mode of Rebalancing
 Key: KAFKA-15680
 URL: https://issues.apache.org/jira/browse/KAFKA-15680
 Project: Kafka
  Issue Type: Bug
  Components: connect
Affects Versions: 3.0.1
Reporter: Pritam Kumar
Assignee: Pritam Kumar


* In ICR(Incremental Cooperative Rebalancing) mode, whenever a new worker, say 
Worker 3 joins, a new global assignment is computed by the leader, say Worker1, 
that results in the revocation of some tasks from each existing worker i.e 
Worker1 and Worker2.
 * Once the new member join is completed, 
*ConsumerCoordinator.OnJoinComplete()* method is called which primarily 
computes all the new partitions assigned and the partitions which are revoked 
and updates the subscription Object.
 * If it was the case of revocation which we check by checking the 
“partitonsRevoked” list, we call the method {*}“invoke{*}PartitionRevoked()” 
which internally calls “updatePartitionCount()” which fetches partition from 
the *assignment* object which is yet not updated by the new assignment.
 * It is only just before calling the “{*}invokePartitionsAssigned{*}()” method 
that we update the *assignment* by invoking the following → 
*subscriptions.assignFromSubscribed(assignedPartitions);*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14593:
URL: https://github.com/apache/kafka/pull/14593#discussion_r1371143824


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -361,11 +410,12 @@ public Optional build() {
 
 maybeUpdateRecordElr(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
+if (record.isr() == null && (!targetIsr.isEmpty() || 
eligibleLeaderReplicasEnabled) && 
!targetIsr.equals(Replicas.toList(partition.isr))) {

Review Comment:
   Updated the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14593:
URL: https://github.com/apache/kafka/pull/14593#discussion_r1371143671


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -93,6 +93,9 @@ public enum Election {
 private boolean eligibleLeaderReplicasEnabled;
 private int minISR;
 
+// Whether allow electing last known leader when ELR is enabled. Note, the 
last known leader will be stored in the
+// lastKnownElr field.
+private boolean lastKnownLeaderEnabled = true;

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15608: Assign lastet leader eopch and offset checkpoint to future log when replacing current log [kafka]

2023-10-24 Thread via GitHub


hudeqi commented on PR #14553:
URL: https://github.com/apache/kafka/pull/14553#issuecomment-1778486603

   I tested this process on the latest trunk branch and it did not reproduce. 
My process is: create a topic with 1 partition, then write traffic and stop 
writing. For example, the current isr is [2,0,1], I will submit a reassign, and 
the isr is [2,0,1] , 2 of which logdir changes. Finally execute reassign. 
Please confirm whether my testing process is different from yours? Thanks 
@drawxy 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15608: Assign lastet leader eopch and offset checkpoint to future log when replacing current log [kafka]

2023-10-24 Thread via GitHub


hudeqi commented on code in PR #14553:
URL: https://github.com/apache/kafka/pull/14553#discussion_r1371126756


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1156,6 +1156,9 @@ class LogManager(logDirs: Seq[File],
   // we will add metrics back after sourceLog remove the metrics
   destLog.removeLogMetrics()
   destLog.updateHighWatermark(sourceLog.highWatermark)
+  sourceLog.leaderEpochCache.foreach(cache =>
+cache.latestEntry.map(entry =>
+  destLog.maybeAssignEpochStartOffset(entry.epoch, entry.startOffset)))

Review Comment:
   Have a doubt. When becoming a new leader, `maybeAssignEpochStartOffset` has 
been executed in `makeLeader`, and the latest epoch and the LEO of the new 
leader have been assigned. Why do we need to copy it again here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371125365


##
core/src/main/scala/kafka/metrics/ClientMetricsConfig.scala:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * 
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * 
+ * {
+ * 
+ *  subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *  subscribedMetrics: List of metric prefixes
+ *  pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *  matchingPatternsList: List of client matching patterns, that are used 
by broker to match the client instance
+ * with the subscription.
+ * 
+ * }
+ * 
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * 
+ *  "metrics" value should be comma separated metrics list. An empty list 
means no metrics subscribed.
+ * A list containing just an empty string means all metrics subscribed.
+ * Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *  "interval.ms" should be between 100 and 360 (1 hour). This is the 
interval at which the client
+ * should push the metrics to the broker.
+ *  "match" is a comma separated list of client match patterns, in case if 
there is no matching
+ * pattern specified then broker considers that as all match which means the 
associated metrics
+ * applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"

Review Comment:
   The example suggests * is supported.  In this case, all 11.1 subversion will 
be returned right? It would be useful to mention that in the doc.
   
   Would it work for metrics? Can I do "org.apache.kafka.client.producer.*" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371110225


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,
+   clientPort: String): Unit = {
+attributesMap(CLIENT_INSTANCE_ID) = clientInstanceId
+attributesMap(CLIENT_ID) = clientId
+attributesMap(CLIENT_SOFTWARE_NAME) = softwareName
+attributesMap(CLIENT_SOFTWARE_VERSION) = softwareVersion
+attributesMap(CLIENT_SOURCE_ADDRESS) = clientHostAddress
+attributesMap(CLIENT_SOURCE_PORT) = clientPort
+  }
+  def getClientId: Option[String] = attributesMap.get(CLIENT_ID)
+
+  def isMatched(patterns: Map[String, String]) : Boolean = {
+// Empty pattern or missing pattern still considered as a match
+if (patterns == null || patterns.isEmpty) {
+  true
+} else {
+  matchPatterns(patterns)
+}
+  }
+
+  private def matchPatterns(matchingPatterns: Map[String, String]) : Boolean = 
{
+try {
+  matchingPatterns.foreach {
+  case (k, 

Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371108662


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,
+   clientPort: String): Unit = {
+attributesMap(CLIENT_INSTANCE_ID) = clientInstanceId
+attributesMap(CLIENT_ID) = clientId
+attributesMap(CLIENT_SOFTWARE_NAME) = softwareName
+attributesMap(CLIENT_SOFTWARE_VERSION) = softwareVersion
+attributesMap(CLIENT_SOURCE_ADDRESS) = clientHostAddress
+attributesMap(CLIENT_SOURCE_PORT) = clientPort
+  }
+  def getClientId: Option[String] = attributesMap.get(CLIENT_ID)
+
+  def isMatched(patterns: Map[String, String]) : Boolean = {
+// Empty pattern or missing pattern still considered as a match
+if (patterns == null || patterns.isEmpty) {
+  true
+} else {
+  matchPatterns(patterns)
+}
+  }
+
+  private def matchPatterns(matchingPatterns: Map[String, String]) : Boolean = 
{
+try {
+  matchingPatterns.foreach {
+  case (k, 

Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371113716


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -85,17 +83,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371113599


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -85,17 +83,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);
-
-brokerEpoch = 
Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
+Content content = new ObjectMapper().readValue(text, 
Content.class);

Review Comment:
   Thanks for the tips!



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1421,15 +1422,15 @@ class LogManager(logDirs: Seq[File],
 for (dir <- liveLogDirs) {
   val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
   val currentBrokerEpoch = cleanShutdownFileHandler.read
-  if (currentBrokerEpoch == -1L) {
+  if (!currentBrokerEpoch.isPresent) {
 info(s"Unable to read the broker epoch in ${dir.toString}.")
 return -1L
   }
-  if (brokerEpoch != -1 && currentBrokerEpoch != brokerEpoch) {
+  if (brokerEpoch != -1 && currentBrokerEpoch.getAsLong != brokerEpoch) {
 info(s"Found different broker epochs in ${dir.toString}. 
Other=$brokerEpoch vs current=$currentBrokerEpoch.")
 return -1L
   }
-  brokerEpoch = currentBrokerEpoch
+  brokerEpoch = currentBrokerEpoch.getAsLong

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371112996


##
core/src/test/scala/kafka/metrics/ClientMetricsTestUtils.scala:
##
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import ClientMetricsConfig.ClientMatchingParams.{CLIENT_SOFTWARE_NAME, 
CLIENT_SOFTWARE_VERSION}
+
+import java.util.Properties
+
+object ClientMetricsTestUtils {
+  val defaultPushInterval = 30 * 1000 // 30 seconds

Review Comment:
   most of the time-vars here are suffix with Ms, I wonder if we want to do it 
here as well...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371110225


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,
+   clientPort: String): Unit = {
+attributesMap(CLIENT_INSTANCE_ID) = clientInstanceId
+attributesMap(CLIENT_ID) = clientId
+attributesMap(CLIENT_SOFTWARE_NAME) = softwareName
+attributesMap(CLIENT_SOFTWARE_VERSION) = softwareVersion
+attributesMap(CLIENT_SOURCE_ADDRESS) = clientHostAddress
+attributesMap(CLIENT_SOURCE_PORT) = clientPort
+  }
+  def getClientId: Option[String] = attributesMap.get(CLIENT_ID)
+
+  def isMatched(patterns: Map[String, String]) : Boolean = {
+// Empty pattern or missing pattern still considered as a match
+if (patterns == null || patterns.isEmpty) {
+  true
+} else {
+  matchPatterns(patterns)
+}
+  }
+
+  private def matchPatterns(matchingPatterns: Map[String, String]) : Boolean = 
{
+try {
+  matchingPatterns.foreach {
+  case (k, 

Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371108662


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,
+   clientPort: String): Unit = {
+attributesMap(CLIENT_INSTANCE_ID) = clientInstanceId
+attributesMap(CLIENT_ID) = clientId
+attributesMap(CLIENT_SOFTWARE_NAME) = softwareName
+attributesMap(CLIENT_SOFTWARE_VERSION) = softwareVersion
+attributesMap(CLIENT_SOURCE_ADDRESS) = clientHostAddress
+attributesMap(CLIENT_SOURCE_PORT) = clientPort
+  }
+  def getClientId: Option[String] = attributesMap.get(CLIENT_ID)
+
+  def isMatched(patterns: Map[String, String]) : Boolean = {
+// Empty pattern or missing pattern still considered as a match
+if (patterns == null || patterns.isEmpty) {
+  true
+} else {
+  matchPatterns(patterns)
+}
+  }
+
+  private def matchPatterns(matchingPatterns: Map[String, String]) : Boolean = 
{
+try {
+  matchingPatterns.foreach {
+  case (k, 

Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1371106132


##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,
+   clientPort: String): Unit = {
+attributesMap(CLIENT_INSTANCE_ID) = clientInstanceId
+attributesMap(CLIENT_ID) = clientId
+attributesMap(CLIENT_SOFTWARE_NAME) = softwareName
+attributesMap(CLIENT_SOFTWARE_VERSION) = softwareVersion
+attributesMap(CLIENT_SOURCE_ADDRESS) = clientHostAddress
+attributesMap(CLIENT_SOURCE_PORT) = clientPort
+  }
+  def getClientId: Option[String] = attributesMap.get(CLIENT_ID)
+
+  def isMatched(patterns: Map[String, String]) : Boolean = {

Review Comment:
   An alternative way to avoid null and if else :
   ```
   def isMatched(patterns: Option[Map[String, String]]): Boolean = {
 patterns match {
   case Some(p) if p.nonEmpty => matchPatterns(p)
   case _ => true
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To 

Re: [PR] KAFKA-14585: Moving StorageTool from core to tools module [kafka]

2023-10-24 Thread via GitHub


github-actions[bot] commented on PR #13417:
URL: https://github.com/apache/kafka/pull/13417#issuecomment-1778451254

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


ijuma commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371089424


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -85,17 +83,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);

Review Comment:
   ObjectMapper creation is expensive, it should be a static final field 
typically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


ijuma commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371088934


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -85,17 +83,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);
-
-brokerEpoch = 
Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
+Content content = new ObjectMapper().readValue(text, 
Content.class);

Review Comment:
   There is both @JsonIgnoreProperties(ignoreUnknown) and @JsonIgnore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371084764


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -85,17 +83,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);
-
-brokerEpoch = 
Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
+Content content = new ObjectMapper().readValue(text, 
Content.class);

Review Comment:
   You are right, downgrade is not flexible here. New fields will break the 
parser. Will revert the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite the meta.properties handling code in Java and fix some issues [kafka]

2023-10-24 Thread via GitHub


pprovenzano commented on code in PR #14628:
URL: https://github.com/apache/kafka/pull/14628#discussion_r1371083468


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -227,8 +231,12 @@ public KafkaClusterTestKit build() throws Exception {
 setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
 BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
 fromVersion(nodes.bootstrapMetadataVersion(), 
"testkit");
+MetaPropertiesEnsemble metaPropertiesEnsemble = new 
MetaPropertiesEnsemble.Loader().
+setLoadMissingBehavior(LoadMissingBehavior.EXCEPTION).
+addMetadataLogDir(node.metadataDirectory()).

Review Comment:
   LoadMissingBehavior.EXCEPTION is not defined anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371084404


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -49,13 +47,15 @@ public class CleanShutdownFileHandler {
 private static final int CURRENT_VERSION = 0;
 private final Logger logger;
 
-private enum Fields {
-VERSION,
-BROKER_EPOCH;
+private static class Content {
+public int version;
+public Long brokerEpoch;
 
-@Override
-public String toString() {
-return name().toLowerCase(Locale.ROOT);
+public Content() {};

Review Comment:
   This is the constructor used by ObjectMapper().readValue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1371084249


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.log;
+package org.apache.kafka.storage.internals.log;

Review Comment:
   Will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix flaky testFollowerCompleteDelayedFetchesOnReplication [kafka]

2023-10-24 Thread via GitHub


dengziming merged PR #14616:
URL: https://github.com/apache/kafka/pull/14616


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-24 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1370533091


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   This is what I had before but this was giving a ERROR: `NoneType' object has 
no attribute 'get` on using get when context args is None



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-24 Thread via GitHub


rreddy-22 commented on PR #14524:
URL: https://github.com/apache/kafka/pull/14524#issuecomment-1778311241

   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5907/ - 
Latest test build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-24 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779278#comment-17779278
 ] 

Philip Nee commented on KAFKA-15602:


Hi [~luke.kirby] - Here's the guide to create a KIP: 
[https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals]

 

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to start reading from */ 
> producer.send(bb); {code}
> Technically, you wouldn't even need to use flip() there, since position is 
> ignored; it would sufficient to just call {{{}bb.limit(bb.position()){}}}. 
> Notably, a buffer constructed using any variant of 

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-24 Thread Luke Kirby (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779276#comment-17779276
 ] 

Luke Kirby commented on KAFKA-15602:


>Well, in example one, the position is at 4, and if we say we want to respect 
>position the result would be empty – the user would need to rewind to zero 
>before calling the serializer to make it work. Does this make sense?

Ah, nah, ByteBuffer.wrap("test".getBytes()) yields a ByteBuffer with position=0 
limit=4; i.e., it's already ready to be read.

I would hope to be able to get some time to start on the KIP sometime this 
week; happy for you two to tell me where I should do it!

 

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the 

[jira] [Updated] (KAFKA-15455) Add support for OffsetCommit version 9 in consumer

2023-10-24 Thread Yi Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yi Ding updated KAFKA-15455:

Labels: kip-848 kip-848-client-support kip-848-e2e kip-848-preview  (was: 
kip-848 kip-848-client-support kip-848-preview)

> Add support for OffsetCommit version 9 in consumer
> --
>
> Key: KAFKA-15455
> URL: https://issues.apache.org/jira/browse/KAFKA-15455
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Philip Nee
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>
> We need to handle the new error codes as specified here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitResponse.json#L46|https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L35]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-24 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1370909940


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
 if (threadCurrentRequest.get() != null) {
   // If the callback is actually executed on a request thread, we can 
directly execute
   // it without re-scheduling it.
-  fun(T)
+  fun(requestLocal, T)

Review Comment:
   This change is added. 
(https://github.com/apache/kafka/pull/14629/commits/ddcf506aff798c1293965050b488ccade0d75db3)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-24 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1370909940


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
 if (threadCurrentRequest.get() != null) {
   // If the callback is actually executed on a request thread, we can 
directly execute
   // it without re-scheduling it.
-  fun(T)
+  fun(requestLocal, T)

Review Comment:
   This change is added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-24 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1370901998


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
 if (threadCurrentRequest.get() != null) {
   // If the callback is actually executed on a request thread, we can 
directly execute
   // it without re-scheduling it.
-  fun(T)
+  fun(requestLocal, T)

Review Comment:
   This change was made for when we want to execute the callback early on the 
same thread as the one receiving the request. (Ie, before we do verification) 
In this case, the current request is actually the same. I think we should 
adjust the conditional to reflect that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15614: Define interfaces and classes for client telemetry [kafka]

2023-10-24 Thread via GitHub


junrao commented on code in PR #14575:
URL: https://github.com/apache/kafka/pull/14575#discussion_r1370888347


##
clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetry.java:
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.telemetry;

Review Comment:
   Since this is a server side interface, we probably want to use a package 
name similar to other server side plugins (e.g., 
`org.apache.kafka.server.quota`, `org.apache.kafka.server.authorizer`, etc). 
Ditto for other public facing classes.
   
   Also, all public interfaces need to be included in the following section in 
`build.gradle`.
   
   ```
 javadoc {
   include "**/org/apache/kafka/clients/admin/*"
   include "**/org/apache/kafka/clients/consumer/*"
   include "**/org/apache/kafka/clients/producer/*"
   include "**/org/apache/kafka/common/*"
   include "**/org/apache/kafka/common/acl/*"
   include "**/org/apache/kafka/common/annotation/*"
   include "**/org/apache/kafka/common/errors/*"
   include "**/org/apache/kafka/common/header/*"
   include "**/org/apache/kafka/common/metrics/*"
   include "**/org/apache/kafka/common/metrics/stats/*"
   include "**/org/apache/kafka/common/quota/*"
   include "**/org/apache/kafka/common/resource/*"
   include "**/org/apache/kafka/common/serialization/*"
   include "**/org/apache/kafka/common/config/*"
   include "**/org/apache/kafka/common/config/provider/*"
   include "**/org/apache/kafka/common/security/auth/*"
   include "**/org/apache/kafka/common/security/plain/*"
   include "**/org/apache/kafka/common/security/scram/*"
   include "**/org/apache/kafka/common/security/token/delegation/*"
   include "**/org/apache/kafka/common/security/oauthbearer/*"
   include "**/org/apache/kafka/common/security/oauthbearer/secured/*"
   include "**/org/apache/kafka/server/authorizer/*"
   include "**/org/apache/kafka/server/policy/*"
   include "**/org/apache/kafka/server/quota/*"
 }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-24 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1370892267


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
 if (threadCurrentRequest.get() != null) {
   // If the callback is actually executed on a request thread, we can 
directly execute
   // it without re-scheduling it.
-  fun(T)
+  fun(requestLocal, T)

Review Comment:
   This is actually still potentially wrong. I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-24 Thread via GitHub


jolshan commented on code in PR #14629:
URL: https://github.com/apache/kafka/pull/14629#discussion_r1370892267


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -55,23 +55,23 @@ object KafkaRequestHandler {
* @param fun Callback function to execute
* @return Wrapped callback that would execute `fun` on a request thread
*/
-  def wrap[T](fun: T => Unit): T => Unit = {
+  def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T 
=> Unit = {
 val requestChannel = threadRequestChannel.get()
 val currentRequest = threadCurrentRequest.get()
 if (requestChannel == null || currentRequest == null) {
   if (!bypassThreadCheck)
 throw new IllegalStateException("Attempted to reschedule to request 
handler thread from non-request handler thread.")
-  T => fun(T)
+  T => fun(requestLocal, T)
 } else {
   T => {
 if (threadCurrentRequest.get() != null) {
   // If the callback is actually executed on a request thread, we can 
directly execute
   // it without re-scheduling it.
-  fun(T)
+  fun(requestLocal, T)

Review Comment:
   This is actually still wrong. I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]

2023-10-24 Thread via GitHub


hanyuzheng7 commented on code in PR #14570:
URL: https://github.com/apache/kafka/pull/14570#discussion_r1370877692


##
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##
@@ -776,64 +779,55 @@ public void verifyStore() {
 globalShouldRejectAllQueries();
 } else {
 shouldRejectUnknownQuery();
-shouldCollectExecutionInfo();
+//shouldCollectExecutionInfo();
 shouldCollectExecutionInfoUnderFailure();
 final String kind = this.kind;
 if (storeToTest.keyValue()) {
 if (storeToTest.timestamped()) {
-final Function, Integer> 
valueExtractor =
-ValueAndTimestamp::value;
-if (kind.equals("DSL")) {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangeDSLQueries(valueExtractor);
-} else {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangePAPIQueries(valueExtractor);
-}
-} else {
-final Function valueExtractor = 
Function.identity();
-if (kind.equals("DSL")) {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangeDSLQueries(valueExtractor);
-} else {
-shouldHandleKeyQuery(2, valueExtractor, 5);
-shouldHandleRangePAPIQueries(valueExtractor);
-}
-}
-}
+shouldHandleKeyQuery(2,  5);
+shouldHandleTimestampedKeyQuery(2, 5);
+shouldHandleRangeQueries();
+shouldHandleTimestampRangeQueries();
+  }
+else {
+shouldHandleKeyQuery(2, 5);

Review Comment:
   ROCK_KV DSL store were execute the code in this else statement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread [kafka]

2023-10-24 Thread via GitHub


jolshan opened a new pull request, #14629:
URL: https://github.com/apache/kafka/pull/14629

   With the new callback mechanism we were accidentally passing context with 
the wrong request local. Now include a RequestLocal as an explicit argument to 
the callback. 
   
   Also make the arguments passed through the callback clearer by separating 
the method out. 
   
   Added a test to ensure we use the request handler's request local and not 
the one passed in when the callback is executed via the request handler.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Rewrite the meta.properties code in Java [kafka]

2023-10-24 Thread via GitHub


cmccabe opened a new pull request, #14628:
URL: https://github.com/apache/kafka/pull/14628

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


junrao commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1370850704


##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.log;
+package org.apache.kafka.storage.internals.log;

Review Comment:
   CleanShutdownFileHandler is a kind of checkpoint file. Would it be better to 
have this in the checkpoint package?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1421,15 +1422,15 @@ class LogManager(logDirs: Seq[File],
 for (dir <- liveLogDirs) {
   val cleanShutdownFileHandler = new CleanShutdownFileHandler(dir.getPath)
   val currentBrokerEpoch = cleanShutdownFileHandler.read
-  if (currentBrokerEpoch == -1L) {
+  if (!currentBrokerEpoch.isPresent) {
 info(s"Unable to read the broker epoch in ${dir.toString}.")
 return -1L
   }
-  if (brokerEpoch != -1 && currentBrokerEpoch != brokerEpoch) {
+  if (brokerEpoch != -1 && currentBrokerEpoch.getAsLong != brokerEpoch) {
 info(s"Found different broker epochs in ${dir.toString}. 
Other=$brokerEpoch vs current=$currentBrokerEpoch.")
 return -1L
   }
-  brokerEpoch = currentBrokerEpoch
+  brokerEpoch = currentBrokerEpoch.getAsLong

Review Comment:
   Should we propagate Option[Long] all the way until we convert it to a 
request?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -85,17 +83,16 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
-public long read() {
-long brokerEpoch = -1L;
+@SuppressWarnings("unchecked")
+public OptionalLong read() {
 try {
 String text = 
Utils.readFileAsString(cleanShutdownFile.toPath().toString());
-Map content = new ObjectMapper().readValue(text, 
HashMap.class);
-
-brokerEpoch = 
Long.parseLong(content.getOrDefault(Fields.BROKER_EPOCH.toString(), "-1L"));
+Content content = new ObjectMapper().readValue(text, 
Content.class);

Review Comment:
   Hmm, by doing this, it seems that we can't upgrade from the current clean 
shutdown file nor could we downgrade if we add a new field in the future?



##
storage/src/main/java/org/apache/kafka/storage/internals/log/CleanShutdownFileHandler.java:
##
@@ -49,13 +47,15 @@ public class CleanShutdownFileHandler {
 private static final int CURRENT_VERSION = 0;
 private final Logger logger;
 
-private enum Fields {
-VERSION,
-BROKER_EPOCH;
+private static class Content {
+public int version;
+public Long brokerEpoch;
 
-@Override
-public String toString() {
-return name().toLowerCase(Locale.ROOT);
+public Content() {};

Review Comment:
   Is this constructor used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15584: Leader election with ELR [kafka]

2023-10-24 Thread via GitHub


artemlivshits commented on code in PR #14593:
URL: https://github.com/apache/kafka/pull/14593#discussion_r1370847603


##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -361,11 +410,12 @@ public Optional build() {
 
 maybeUpdateRecordElr(record);
 
-if (record.isr() == null && !targetIsr.isEmpty() && 
!targetIsr.equals(Replicas.toList(partition.isr))) {
+if (record.isr() == null && (!targetIsr.isEmpty() || 
eligibleLeaderReplicasEnabled) && 
!targetIsr.equals(Replicas.toList(partition.isr))) {

Review Comment:
   Should we add a comment here that we let ISR shrink to empty if we're using 
eligible leader replicas?



##
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##
@@ -93,6 +93,9 @@ public enum Election {
 private boolean eligibleLeaderReplicasEnabled;
 private int minISR;
 
+// Whether allow electing last known leader when ELR is enabled. Note, the 
last known leader will be stored in the
+// lastKnownElr field.
+private boolean lastKnownLeaderEnabled = true;

Review Comment:
   Should we name it something like `useLastKnownLeaderInBalancedRecovery`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15673: Adding client metrics resource types (KIP-714) [kafka]

2023-10-24 Thread via GitHub


AndrewJSchofield commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1370842597


##
core/src/main/scala/kafka/server/ZkConfigManager.scala:
##
@@ -38,7 +38,8 @@ object ConfigType {
   val User = "users"
   val Broker = "brokers"
   val Ip = "ips"
-  val all = Seq(Topic, Client, User, Broker, Ip)
+  val ClientMetrics = "client-metrics"

Review Comment:
   Given that we don't support client metrics on ZK, I am surprised to see code 
changes in ZkConfigManager.



##
core/src/main/scala/kafka/metrics/ClientMetricsMetadata.scala:
##
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics
+
+import kafka.Kafka.info
+import kafka.metrics.ClientMetricsConfig.ClientMatchingParams._
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+val ctx = request.context
+val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+val instance = new ClientMetricsMetadata
+instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   * occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return map of client matching pattern entries
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+val patternsMap = mutable.Map[String, String]()
+if (patterns != null) {
+  patterns.foreach(x => {
+val nameValuePair = x.split("=", 2).map(x => x.trim)
+if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+  patternsMap += (nameValuePair(0) -> nameValuePair(1))
+} else {
+  throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+}
+  })
+}
+patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+try {
+  Pattern.compile(inputPattern)
+  true
+} catch {
+  case e: PatternSyntaxException =>
+false
+}
+  }
+
+}
+
+class ClientMetricsMetadata {
+  var attributesMap: mutable.Map[String, String] = 
scala.collection.mutable.Map[String, String]()
+
+  private def init(clientInstanceId: String,
+   clientId: String,
+   softwareName: String,
+   softwareVersion: String,
+   clientHostAddress: String,

Review Comment:
   I suggest to make this `clientSourceAddress` and `clientSourcePort` so the 
names all agree with the match selector names in the KIP.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2619,6 +2619,9 @@ private ConfigEntry.ConfigSource 
configSource(DescribeConfigsResponse.ConfigSour
 case DYNAMIC_BROKER_LOGGER_CONFIG:
 configSource = 

[jira] [Resolved] (KAFKA-15304) CompletableApplicationEvents aren't being completed when the consumer is closing

2023-10-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True resolved KAFKA-15304.
---
Fix Version/s: 3.7.0
 Reviewer: Jun Rao
   Resolution: Fixed

Fixed as part of PR [#14406|https://github.com/apache/kafka/pull/14406].

> CompletableApplicationEvents aren't being completed when the consumer is 
> closing
> 
>
> Key: KAFKA-15304
> URL: https://issues.apache.org/jira/browse/KAFKA-15304
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.7.0
>
>
> If the background thread is closed before ingesting all ApplicationEvents, we 
> should drain the background queue and try to cancel these events before 
> closing. We can try to process these events before closing down the consumer; 
> however, we assume that when the user issues a close command, the consumer 
> should be shut down promptly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KIP-951: protocol changes [kafka]

2023-10-24 Thread via GitHub


chb2ab opened a new pull request, #14627:
URL: https://github.com/apache/kafka/pull/14627

   Separating out the protocol changes from 
https://github.com/apache/kafka/pull/1 in an effort to more quickly unblock 
the client side PR.
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-951%3A+Leader+discovery+optimisations+for+the+client
   
   https://issues.apache.org/jira/browse/KAFKA-15661
   
   ### Testing
   
   `./gradlew core:test --tests kafka.server.KafkaApisTest`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15188) Implement more of the remaining PrototypeAsyncConsumer APIs

2023-10-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15188:
--
Description: 
There are several {{Consumer}} APIs that only touch the {{ConsumerMetadata}} 
and/or {{SubscriptionState}} classes; they do not perform network I/O or 
otherwise block. These can be implemented without needing {{RequestManager}} 
updates and include the following APIs:
 - {{committed}}
 - {{currentLag}}
 - {{metrics}}
 - {{pause}}
 - {{paused}}
 - {{position}}
 - {{resume}}
 - {{seek}}
 - {{seekToBeginning}}
 - {{seekToEnd}}
 - {{subscribe}}

  was:
There are several {{Consumer}} APIs that only touch the {{ConsumerMetadata}} 
and/or {{SubscriptionState}} classes; they do not perform network I/O or 
otherwise block. These can be implemented without needing {{RequestManager}} 
updates and include the following APIs:
 - {{committed}}
 - {{currentLag}}
 - {{listTopics}}
 - {{metrics}}
 - {{partitionsFor}}
 - {{pause}}
 - {{paused}}
 - {{position}}
 - {{resume}}
 - {{seek}}
 - {{seekToBeginning}}
 - {{seekToEnd}}
 - {{subscribe}}


> Implement more of the remaining PrototypeAsyncConsumer APIs
> ---
>
> Key: KAFKA-15188
> URL: https://issues.apache.org/jira/browse/KAFKA-15188
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> There are several {{Consumer}} APIs that only touch the {{ConsumerMetadata}} 
> and/or {{SubscriptionState}} classes; they do not perform network I/O or 
> otherwise block. These can be implemented without needing {{RequestManager}} 
> updates and include the following APIs:
>  - {{committed}}
>  - {{currentLag}}
>  - {{metrics}}
>  - {{pause}}
>  - {{paused}}
>  - {{position}}
>  - {{resume}}
>  - {{seek}}
>  - {{seekToBeginning}}
>  - {{seekToEnd}}
>  - {{subscribe}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15184) New consumer internals refactoring and clean up

2023-10-24 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779252#comment-17779252
 ] 

Kirk True edited comment on KAFKA-15184 at 10/24/23 9:17 PM:
-

See pull request [#14406|https://github.com/apache/kafka/pull/14406].


was (Author: kirktrue):
See pull request #14406.

> New consumer internals refactoring and clean up
> ---
>
> Key: KAFKA-15184
> URL: https://issues.apache.org/jira/browse/KAFKA-15184
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Minor refactoring of the new consumer internals including introduction of the 
> {{RequestManagers}} class to hold references to the {{RequestManager}} 
> instances.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15347: add support for 'single key multi timestamp' IQs with versioned state stores (KIP-968) [kafka]

2023-10-24 Thread via GitHub


aliehsaeedii opened a new pull request, #14626:
URL: https://github.com/apache/kafka/pull/14626

   This is a POC PR for KIP-968.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15456) Add support for OffsetFetch version 9 in consumer

2023-10-24 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15456:
--
Labels: kip-848 kip-848-client-support kip-848-e2e kip-848-preview  (was: 
kip-848 kip-848-client-support kip-848-preview)

> Add support for OffsetFetch version 9 in consumer
> -
>
> Key: KAFKA-15456
> URL: https://issues.apache.org/jira/browse/KAFKA-15456
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support, kip-848-e2e, 
> kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-24 Thread via GitHub


junrao merged PR #14406:
URL: https://github.com/apache/kafka/pull/14406


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779216#comment-17779216
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

{quote}I'm not entirely sure I understand Matthias's point regarding example 1,
{quote}
Well, in example one, the position is at 4, and if we say we want to respect 
position the result would be empty – the user would need to rewind to zero 
before calling the serializer to make it work. Does this make sense?

We can discuss details on a KIP, but instead of introducing a new class, I was 
thinking if we should use a config `enable.auto.rewind=true` (by default), that 
users can set to `false` to get the new behavior. For this case, we don't break 
compatibility, and it gives user the ability to add explicit rewind call before 
calling serialize before they change the config to `false`.
{quote}Can you create a Jira to document
{quote}
I think we can re-open K4852 for this purpse?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-24 Thread via GitHub


kirktrue commented on PR #14406:
URL: https://github.com/apache/kafka/pull/14406#issuecomment-1777892441

   @junrao—yes. Here's a brief history for JDK 21, starting with the most 
recent build (build 74):
   
   * [build 
74](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14406/74/pipeline/):
 couldn't clone repo
   * [build 
73](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14406/73/pipeline/):
 tests ran, but Jenkins timed out
   * [build 
72](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14406/72/pipeline/):
 tests ran
   * [build 
71](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14406/71/pipeline/):
 tests ran
   * [build 
70](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14406/70/pipeline/):
 tests ran, but Jenkins shut down in the middle 樂
   
   These same intra-Jenkins communication, `git` cloning, unexpected threads, 
and flaky tests affect all of the JDKs at random times 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-24 Thread via GitHub


junrao commented on PR #14406:
URL: https://github.com/apache/kafka/pull/14406#issuecomment-1777855712

   @kirktrue : The build for JDK 21 and Scala 2.13 failed this time. Did it 
succeed before?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Description: 
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return with 
the fetch response has an asymptotic complexity proportional to the number of 
segments in the log. This is not a problem with local storage since the 
constant factor to traverse the producer snapshot files is small enough, but 
that is not the case with a remote storage which exhibits higher read latency.

An aggravating factor was the lock contention in the remote index cache which 
was mitigated by KAFKA-15084 since then. But unfortunately, despite the 
improvements observed without the said contention, the algorithmic complexity 
of the current method used to compute uncommitted records can always defeat any 
optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.

  was:
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return with 
the fetch response has an asymptotic complexity proportional to the number of 
segments in the log. This is not a problem with local storage since the 
constant factor to traverse the files is small enough, but that is not the case 
with a remote storage which exhibits higher read latency.

An aggravating factor was the lock contention in the remote index cache which 
was mitigated by KAFKA-15084 since then. But unfortunately, despite the 
improvements observed without the said contention, the algorithmic complexity 
of the current method used to compute uncommitted records can always defeat any 
optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.


> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>  Labels: KIP-405
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> with the fetch response has an asymptotic complexity proportional to the 
> number of segments in the log. This is not a problem with local storage since 
> the constant factor to traverse the producer snapshot files is small enough, 
> but that is not the case with a remote storage which exhibits higher read 
> latency.
> An aggravating factor was the lock contention in the remote index cache which 
> was mitigated by KAFKA-15084 since then. But unfortunately, despite the 
> improvements observed without the said contention, the algorithmic complexity 
> of the current method used to compute uncommitted records can always defeat 
> any optimisation made on the remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of 

[PR] MINOR: avoid blocking for randomness in DefaultRecordBatchTest [kafka]

2023-10-24 Thread via GitHub


gaurav-narula opened a new pull request, #14625:
URL: https://github.com/apache/kafka/pull/14625

   PR #13135 introduced tests in DefaultRecordBatchTest which require filling 
buffers with random data. Using
   `SecureRandom.getInstanceStrong()` results in using `/dev/random` which is 
known to block in Linux when the OS runs low on entropy. This was noticable 
when running tests in containerised CI environments.
   
   This PR avoids using a CSPRNG altogether since the tests do not need 
cryptographically secure random numbers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: close iterator to avoid resource leak [kafka]

2023-10-24 Thread via GitHub


bbejeck commented on PR #14624:
URL: https://github.com/apache/kafka/pull/14624#issuecomment-1777825636

   @mjsax it looks like the build is failing with a checkstyle error though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: close iterator to avoid resource leak [kafka]

2023-10-24 Thread via GitHub


lihaosky commented on code in PR #14624:
URL: https://github.com/apache/kafka/pull/14624#discussion_r1370675126


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##
@@ -264,23 +264,25 @@ private void fetchAndEmit(final Record record,
   final long emitRangeUpperBound) {
 final long startMs = time.milliseconds();
 
+int emittedCount = 0;
+
 // Only time ordered (indexed) session store should have 
implemented
 // this function, otherwise a not-supported exception would throw
-final KeyValueIterator, VAgg> windowToEmit = store
-.findSessions(emitRangeLowerBound, emitRangeUpperBound);
-
-int emittedCount = 0;
-while (windowToEmit.hasNext()) {
-emittedCount++;
-final KeyValue, VAgg> kv = windowToEmit.next();
-
-tupleForwarder.maybeForward(
-record.withKey(kv.key)
-.withValue(new Change<>(kv.value, null))
-// set the timestamp as the window end timestamp
-.withTimestamp(kv.key.window().end())
-.withHeaders(record.headers()));
-}
+try (final KeyValueIterator, VAgg> windowToEmit = 
store
+.findSessions(emitRangeLowerBound, emitRangeUpperBound)) {
+
+while (windowToEmit.hasNext()) {
+emittedCount++;
+final KeyValue, VAgg> kv = 
windowToEmit.next();
+
+tupleForwarder.maybeForward(
+record.withKey(kv.key)
+.withValue(new Change<>(kv.value, null))
+// set the timestamp as the window end timestamp
+.withTimestamp(kv.key.window().end())
+.withHeaders(record.headers()));
+}
+  }

Review Comment:
   Is this indentation off?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1370658110


##
storage/src/main/java/org/apache/kafka/storage/internals/util/CleanShutdownFileHandler.java:
##
@@ -85,6 +85,7 @@ void write(long brokerEpoch, int version) throws Exception {
 }
 }
 
+@SuppressWarnings("unchecked")
 public long read() {
 long brokerEpoch = -1L;

Review Comment:
   Thanks for the advice, updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15582: Move the clean shutdown file to the storage package [kafka]

2023-10-24 Thread via GitHub


CalvinConfluent commented on code in PR #14603:
URL: https://github.com/apache/kafka/pull/14603#discussion_r1370657796


##
storage/src/main/java/org/apache/kafka/storage/internals/util/CleanShutdownFileHandler.java:
##
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package kafka.log;
+package org.apache.kafka.storage.internals.util;

Review Comment:
   Done. Moved to log



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when group coordinator bounces

2023-10-24 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15676:
--
Description: 
When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks (for more 
context, see KAFKA-9184). However, this change in state is not relayed to the 
worker's instance of the [IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].

If the group coordinator for a Connect cluster is unavailable for long enough, 
all of the workers in the cluster will revoke their assigned connectors and 
tasks and, upon rejoining the group, report that they have been assigned no 
connectors and tasks.

If a worker's member ID is reset before rejoining the group (which can happen 
if, for example, the [maximum poll 
interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
 for the worker is exceeded), the leader of the cluster will not act as if the 
worker had rejoined the group; instead, it will act as if the worker had left 
the group and a new, unrelated worker had joined during the same rebalance. 
This will cause the scheduled rebalance delay to be triggered, and for the 
connectors and tasks previously-assigned to that worker to remain unassigned 
until the delay expires.

  was:
When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks (for more 
context, see KAFKA-9184). However, this change in state is not relayed to the 
worker's instance of the [IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].

If the group coordinator for a Connect cluster is unavailable for long enough, 
all of the workers in cluster will revoke their assigned connectors and tasks 
and, upon rejoining the group, report that they have been assigned no 
connectors and tasks.

If a worker's member ID is reset before rejoining the group (which can happen 
if, for example, the [maximum poll 
interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
 for the worker is exceeded), the leader of the cluster will not act as if the 
worker had rejoined the group; instead, it will act as if the worker had left 
the group and a new, unrelated worker had joined during the same rebalance. 
This will cause the scheduled rebalance delay to be triggered, and for the 
connectors and tasks previously-assigned to that worker to remain unassigned 
until the delay expires.


> Scheduled rebalance delay for Connect is unnecessarily triggered when group 
> coordinator bounces
> ---
>
> Key: KAFKA-15676
> URL: https://issues.apache.org/jira/browse/KAFKA-15676
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a Connect worker loses contact with the group coordinator, it 
> voluntarily gives up (i.e., stops) its assignment of connectors and tasks 
> (for more context, see KAFKA-9184). However, this change in state is not 
> relayed to the worker's instance of the [IncrementalCooperativeAssignor 
> class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
> If the group coordinator for a Connect cluster is unavailable for long 
> enough, all of the workers in the cluster will revoke their assigned 
> connectors and tasks and, upon rejoining the group, report that they have 
> been assigned no connectors and tasks.
> If a worker's member ID is reset before rejoining the group (which can happen 
> if, for example, the [maximum poll 
> interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
>  for the worker is exceeded), the leader of the cluster will not act as if 
> the worker had rejoined the group; instead, it will act as if the worker had 
> left the group and a new, unrelated worker had joined during the same 
> rebalance. This will cause the scheduled rebalance delay to be triggered, and 
> for the connectors and tasks previously-assigned to that worker to remain 
> unassigned until the delay expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] HOTFIX: close iterator to avoid resource leak [kafka]

2023-10-24 Thread via GitHub


mjsax opened a new pull request, #14624:
URL: https://github.com/apache/kafka/pull/14624

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-24 Thread via GitHub


kirktrue commented on code in PR #14565:
URL: https://github.com/apache/kafka/pull/14565#discussion_r1370618922


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -976,15 +1053,14 @@ public void subscribe(Collection topics) {
  * Group rebalances only take place during an active call to {@link 
#poll(Duration)}.
  *
  * @param pattern Pattern to subscribe to
- * @param listener Non-null listener instance to get notifications on 
partition assignment/revocation for the
- * subscribed topics
+ * @param listener {@link Optional} listener instance to get notifications 
on partition assignment/revocation
+ * for the subscribed topics
  * @throws IllegalArgumentException If pattern or listener is null
  * @throws IllegalStateException If {@code subscribe()} is called 
previously with topics, or assign is called
  *   previously (without a subsequent call to 
{@link #unsubscribe()}), or if not
  *   configured at-least one partition 
assignment strategy
  */
-@Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+private void subscribe(Pattern pattern, 
Optional listener) {
 maybeThrowInvalidGroupIdException();
 if (pattern == null || pattern.toString().equals(""))

Review Comment:
   Agreed. I'm loath to change it at this point due to some weird side effect 
of which I'm unaware.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-24 Thread via GitHub


kirktrue commented on code in PR #14565:
URL: https://github.com/apache/kafka/pull/14565#discussion_r1370615968


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in 
the {@link ConsumerRebalanceListener}
+ * interface. When consumer group partition assignment changes, these methods 
are invoked. This class wraps those
+ * callback calls with some logging, optional {@link Sensor} updates, etc.
+ */
+class ConsumerRebalanceListenerInvoker {
+
+private final Logger log;
+private final SubscriptionState subscriptions;
+private final Time time;
+private final ConsumerCoordinatorMetrics coordinatorMetrics;
+
+ConsumerRebalanceListenerInvoker(LogContext logContext,
+ SubscriptionState subscriptions,
+ Time time,
+ ConsumerCoordinatorMetrics 
coordinatorMetrics) {
+this.log = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.time = time;
+this.coordinatorMetrics = coordinatorMetrics;
+}
+
+Exception invokePartitionsAssigned(final SortedSet 
assignedPartitions) {
+log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions, ", "));
+
+Optional listener = 
subscriptions.rebalanceListener();
+
+if (listener.isPresent()) {
+try {
+final long startMs = time.milliseconds();
+listener.get().onPartitionsAssigned(assignedPartitions);
+
coordinatorMetrics.assignCallbackSensor.record(time.milliseconds() - startMs);
+} catch (WakeupException | InterruptException e) {
+throw e;
+} catch (Exception e) {
+log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
+listener.getClass().getName(), assignedPartitions, e);
+return e;

Review Comment:
   The caller does that. The `Exception` type is what is used by the existing 
code, so 路‍♂️ 



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import 

[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Description: 
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return with 
the fetch response has an asymptotic complexity proportional to the number of 
segments in the log. This is not a problem with local storage since the 
constant factor to traverse the files is small enough, but that is not the case 
with a remote storage which exhibits higher read latency.

An aggravating factor was the lock contention in the remote index cache which 
was mitigated by KAFKA-15084 since then. But unfortunately, despite the 
improvements observed without the said contention, the algorithmic complexity 
of the current method used to compute uncommitted records can always defeat any 
optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.

  was:
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return with 
the fetch response has an asymptotic complexity proportional to the number of 
segments in the log. This is not a problem with local storage since the 
constant factor to traverse the files is small enough, but that is not the case 
with a remote storage which exhibits higher read latency. An aggravating factor 
was the lock contention in the remote index cache which was then mitigated by 
KAFKA-15084. But unfortunately, despite the improvements observed without the 
said contention, the algorithmic complexity of the current method used to 
compute uncommitted records can always defeat any optimisation made on the 
remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.


> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>  Labels: KIP-405
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> with the fetch response has an asymptotic complexity proportional to the 
> number of segments in the log. This is not a problem with local storage since 
> the constant factor to traverse the files is small enough, but that is not 
> the case with a remote storage which exhibits higher read latency.
> An aggravating factor was the lock contention in the remote index cache which 
> was mitigated by KAFKA-15084 since then. But unfortunately, despite the 
> improvements observed without the said contention, the algorithmic complexity 
> of the current method used to compute uncommitted records can always defeat 
> any optimisation made on the remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of segments and irrespective of the spans of 
> 

[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Description: 
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return with 
the fetch response has an asymptotic complexity proportional to the number of 
segments in the log. This is not a problem with local storage since the 
constant factor to traverse the files is small enough, but that is not the case 
with a remote storage which exhibits higher read latency. An aggravating factor 
was the lock contention in the remote index cache which was then mitigated by 
KAFKA-15084. But unfortunately, despite the improvements observed without the 
said contention, the algorithmic complexity of the current method used to 
compute uncommitted records can always defeat any optimisation made on the 
remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.

  was:
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without the said contention, 
the algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.


> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>  Labels: KIP-405
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> with the fetch response has an asymptotic complexity proportional to the 
> number of segments in the log. This is not a problem with local storage since 
> the constant factor to traverse the files is small enough, but that is not 
> the case with a remote storage which exhibits higher read latency. An 
> aggravating factor was the lock contention in the remote index cache which 
> was then mitigated by KAFKA-15084. But unfortunately, despite the 
> improvements observed without the said contention, the algorithmic complexity 
> of the current method used to compute uncommitted records can always defeat 
> any optimisation made on the remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of segments and irrespective of the spans of 
> transactions.



--
This message 

[jira] [Updated] (KAFKA-15679) Client support for new consumer configs

2023-10-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15679:
---
Summary: Client support for new consumer configs  (was: Client support fore 
new consumer configs)

> Client support for new consumer configs
> ---
>
> Key: KAFKA-15679
> URL: https://issues.apache.org/jira/browse/KAFKA-15679
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
>
> New consumer should support the new configs introduced by KIP-848
> |group.protocol|enum|generic|A flag which indicates if the new protocol 
> should be used or not. It could be: generic or consumer|
> |group.remote.assignor|string|null|The server side assignor to use. It cannot 
> be used in conjunction with group.local.assignor. {{null}} means that the 
> choice of the assignor is left to the group coordinator.|
> The protocol introduces a 3rd property for client side (local) assignors, but 
> that will be introduced later on. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15301) [Tiered Storage] Historically compacted topics send request to remote for active segment during consume

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15301:
--
Description: 
I have a use case where tiered storage plugin received requests for active 
segments. The topics for which it happened were historically compacted topics 
for which compaction was disabled and tiering was enabled.

Create topic with compact cleanup policy -> Produce data with few repeat keys 
and create multiple segments -> let compaction happen -> change cleanup policy 
to delete -> produce some more data for segment rollover -> enable tiering on 
topic -> wait for segments to be uploaded to remote storage and cleanup from 
local (active segment would remain), consume from beginning -> Observe logs.

  was:
In AWS MSK (Kafka 2.8) tiered storage a case surfaced where tiered storage 
plugin received requests for active segments. The topics for which it happened 
were historically compacted topics for which compaction was disabled and 
tiering was enabled.

Create topic with compact cleanup policy -> Produce data with few repeat keys 
and create multiple segments -> let compaction happen -> change cleanup policy 
to delete -> produce some more data for segment rollover -> enable tiering on 
topic -> wait for segments to be uploaded to remote storage and cleanup from 
local (active segment would remain), consume from beginning -> Observe logs.


> [Tiered Storage] Historically compacted topics send request to remote for 
> active segment during consume
> ---
>
> Key: KAFKA-15301
> URL: https://issues.apache.org/jira/browse/KAFKA-15301
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.6.0
>Reporter: Mital Awachat
>Assignee: Jimmy Wang
>Priority: Major
> Fix For: 3.7.0
>
>
> I have a use case where tiered storage plugin received requests for active 
> segments. The topics for which it happened were historically compacted topics 
> for which compaction was disabled and tiering was enabled.
> Create topic with compact cleanup policy -> Produce data with few repeat keys 
> and create multiple segments -> let compaction happen -> change cleanup 
> policy to delete -> produce some more data for segment rollover -> enable 
> tiering on topic -> wait for segments to be uploaded to remote storage and 
> cleanup from local (active segment would remain), consume from beginning -> 
> Observe logs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15627: KIP-951's Leader discovery optimisations on the client [kafka]

2023-10-24 Thread via GitHub


msn-tldr commented on code in PR #14564:
URL: https://github.com/apache/kafka/pull/14564#discussion_r1370368421


##
clients/src/main/java/org/apache/kafka/clients/MetadataCache.java:
##
@@ -150,7 +150,7 @@ MetadataCache mergeWith(String newClusterId,
 // We want the most recent topic ID. We start with the previous ID 
stored for retained topics and then
 // update with newest information from the MetadataResponse. We always 
take the latest state, removing existing
 // topic IDs if the latest state contains the topic name but not a 
topic ID.
-Map newTopicIds = topicIds.entrySet().stream()
+Map newTopicIds = this.topicIds.entrySet().stream()

Review Comment:
   this was introduced here
   
https://github.com/apache/kafka/pull/11004/files#diff-5e4d9713b6e5a386e5cac7e81215160948859243a71b439c3a990876e56b3ec5
   
   Net effect of the bug was that in the merged cache, the IDs of retained 
topics(from pre-existing metadata) would be lost in the newly built cache(via 
merging).
   As the comment explains the intention of the code is to get merged list of 
topic-ids. This is done by initialising the merged list of topic-ids with 
retained topic(`this.topicIds`). And then updating with newest 
information(`this.topicIds`). But the code used to use `topicIds` even to get 
retained topic-ids. 
   
   > We start with the previous ID stored for retained topics and then update 
with newest information from the MetadataResponse.
   
   



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
 }
 
+/**
+ * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+ * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+ * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+ * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+ * @param partitionLeaders map of new leadership information for 
partitions.
+ * @param leaderNodes a list of nodes for leaders in the above map.
+ * @return a set of partitions, for which leaders were updated.
+ */
+public synchronized Set 
updatePartially(Map partitionLeaders, 
List leaderNodes) {
+Map newNodes = 
leaderNodes.stream().collect(Collectors.toMap(Node::id, node -> node));
+// Insert non-overlapping nodes from existing-nodes into new-nodes.
+this.cache.cluster().nodes().stream().forEach(node -> 
newNodes.putIfAbsent(node.id(), node));
+
+// Create partition-metadata for all updated partitions. Exclude 
updates for partitions -
+// 1. for which the corresponding partition has newer leader in 
existing metadata.
+// 2. for which corresponding leader's node is missing in the 
new-nodes.
+// 3. for which the existing metadata doesn't know about the partition.
+List updatePartitionMetadata = new ArrayList<>();
+Set updatedPartitions = new HashSet<>();
+for (Entry partitionLeader: partitionLeaders.entrySet()) {
+TopicPartition partition = (TopicPartition) 
partitionLeader.getKey();
+Metadata.LeaderAndEpoch currentLeader = currentLeader(partition);
+Metadata.LeaderIdAndEpoch newLeader = (LeaderIdAndEpoch) 
partitionLeader.getValue();
+if (!newLeader.epoch.isPresent() || 
!newLeader.leaderId.isPresent()) {
+log.trace("For {}, incoming leader information is incomplete 
{}", partition, newLeader);

Review Comment:
   You are right, i see everywhere else is at `DEBUG` level so i have changed 
the logging level.



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -350,6 +353,92 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 log.debug("Updated cluster metadata updateVersion {} to {}", 
this.updateVersion, this.cache);
 }
 
+/**
+ * Update the metadata by merging existing metadata with the input leader 
information and nodes. This is called whenever
+ * partial updates to metadata are returned in a response from broker(ex - 
ProduceResponse & FetchResponse).
+ * Note that the updates via Metadata RPC are handled separately in 
({@link #update}).
+ * Both partitionLeader and leaderNodes override the existing metadata. 
Non-overlapping metadata is kept as it is.
+ * @param partitionLeaders map of new leadership information for 
partitions.
+ * @param leaderNodes a list of nodes for leaders in the above map.
+ * @return a set of partitions, for which leaders were updated.
+ */
+public 

[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Description: 
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without the said contention, 
the algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.

  was:
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without said contention, the 
algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.


> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>  Labels: KIP-405
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> have an asymptotic complexity proportional to the number of segments in the 
> log. That is not a problem with local storage since the constant factor to 
> traverse the files is small enough, but that is not the case with a remote 
> storage which exhibits higher read latency. An aggravating factor was the 
> lock contention in the remote index cache which was then mitigated by 
> KAFKA-15084. But unfortunately, despite the improvements observed without the 
> said contention, the algorithmic complexity of the current method used to 
> compute uncommitted records can always defeat any optimisation made on the 
> remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of segments and irrespective of the transaction 
> spans.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Description: 
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without the said contention, 
the algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the spans of 
transactions.

  was:
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without the said contention, 
the algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.


> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>  Labels: KIP-405
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> have an asymptotic complexity proportional to the number of segments in the 
> log. That is not a problem with local storage since the constant factor to 
> traverse the files is small enough, but that is not the case with a remote 
> storage which exhibits higher read latency. An aggravating factor was the 
> lock contention in the remote index cache which was then mitigated by 
> KAFKA-15084. But unfortunately, despite the improvements observed without the 
> said contention, the algorithmic complexity of the current method used to 
> compute uncommitted records can always defeat any optimisation made on the 
> remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of segments and irrespective of the spans of 
> transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15679) Client support fore new consumer configs

2023-10-24 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-15679:
---
Summary: Client support fore new consumer configs  (was: Client support new 
consumer configs)

> Client support fore new consumer configs
> 
>
> Key: KAFKA-15679
> URL: https://issues.apache.org/jira/browse/KAFKA-15679
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
>
> New consumer should support the new configs introduced by KIP-848
> |group.protocol|enum|generic|A flag which indicates if the new protocol 
> should be used or not. It could be: generic or consumer|
> |group.remote.assignor|string|null|The server side assignor to use. It cannot 
> be used in conjunction with group.local.assignor. {{null}} means that the 
> choice of the assignor is left to the group coordinator.|
> The protocol introduces a 3rd property for client side (local) assignors, but 
> that will be introduced later on. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Description: 
I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without said contention, the 
algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.

  was:
Hi team,

I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without said contention, the 
algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.


> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> have an asymptotic complexity proportional to the number of segments in the 
> log. That is not a problem with local storage since the constant factor to 
> traverse the files is small enough, but that is not the case with a remote 
> storage which exhibits higher read latency. An aggravating factor was the 
> lock contention in the remote index cache which was then mitigated by 
> KAFKA-15084. But unfortunately, despite the improvements observed without 
> said contention, the algorithmic complexity of the current method used to 
> compute uncommitted records can always defeat any optimisation made on the 
> remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of segments and irrespective of the transaction 
> spans.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez updated KAFKA-15678:
--
Labels: KIP-405  (was: )

> [Tiered Storage] Stall remote reads with long-spanning transactions
> ---
>
> Key: KAFKA-15678
> URL: https://issues.apache.org/jira/browse/KAFKA-15678
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Alexandre Dupriez
>Priority: Major
>  Labels: KIP-405
>
> I am facing an issue on the remote data path for uncommitted reads.
> As mentioned in [the original 
> PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
> transaction spans over a long sequence of segments, the time taken to 
> retrieve the producer snapshots from the remote storage can, in the worst 
> case, become redhibitory and block the reads if it consistently exceed the 
> deadline of fetch requests ({{{}fetch.max.wait.ms{}}}).
> Essentially, the method used to compute the uncommitted records to return 
> have an asymptotic complexity proportional to the number of segments in the 
> log. That is not a problem with local storage since the constant factor to 
> traverse the files is small enough, but that is not the case with a remote 
> storage which exhibits higher read latency. An aggravating factor was the 
> lock contention in the remote index cache which was then mitigated by 
> KAFKA-15084. But unfortunately, despite the improvements observed without 
> said contention, the algorithmic complexity of the current method used to 
> compute uncommitted records can always defeat any optimisation made on the 
> remote read path.
> Maybe we could start thinking (if not already) about a different construct 
> which would reduce that complexity to O(1) - i.e. to make the computation 
> independent from the number of segments and irrespective of the transaction 
> spans.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-10-24 Thread Grigorii Piskunov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779181#comment-17779181
 ] 

Grigorii Piskunov commented on KAFKA-15562:
---

Hi [~pnee] yes, sure. Please share available. I would be happy to help and 
participate in the project.

 

 

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Both fetchOffsetRequest and commitOffsetRequest handlers don't have 
> sufficient logic to handle timeout exception.
>  
> CommitOffsetRequest handler also doesn't handle various of server error such 
> as coordinator not found. We need to handle:
> If Exception is non null:
>  - handle RetriableError that respects requestTimeoutMs
>  - handle NonRetriableError
>  
> If the response contains error, ensure to:
>  - mark coordinator unknown if needed
>  - retry if needed
>  - fail the request



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15679) Client support new consumer configs

2023-10-24 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-15679:
--

 Summary: Client support new consumer configs
 Key: KAFKA-15679
 URL: https://issues.apache.org/jira/browse/KAFKA-15679
 Project: Kafka
  Issue Type: Sub-task
  Components: clients, consumer
Reporter: Lianet Magrans


New consumer should support the new configs introduced by KIP-848
|group.protocol|enum|generic|A flag which indicates if the new protocol should 
be used or not. It could be: generic or consumer|
|group.remote.assignor|string|null|The server side assignor to use. It cannot 
be used in conjunction with group.local.assignor. {{null}} means that the 
choice of the assignor is left to the group coordinator.|
The protocol introduces a 3rd property for client side (local) assignors, but 
that will be introduced later on. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15678) [Tiered Storage] Stall remote reads with long-spanning transactions

2023-10-24 Thread Alexandre Dupriez (Jira)
Alexandre Dupriez created KAFKA-15678:
-

 Summary: [Tiered Storage] Stall remote reads with long-spanning 
transactions
 Key: KAFKA-15678
 URL: https://issues.apache.org/jira/browse/KAFKA-15678
 Project: Kafka
  Issue Type: Bug
  Components: Tiered-Storage
Affects Versions: 3.6.0
Reporter: Alexandre Dupriez


Hi team,

I am facing an issue on the remote data path for uncommitted reads.

As mentioned in [the original 
PR|https://github.com/apache/kafka/pull/13535#discussion_r1166887367], if a 
transaction spans over a long sequence of segments, the time taken to retrieve 
the producer snapshots from the remote storage can, in the worst case, become 
redhibitory and block the reads if it consistently exceed the deadline of fetch 
requests ({{{}fetch.max.wait.ms{}}}).

Essentially, the method used to compute the uncommitted records to return have 
an asymptotic complexity proportional to the number of segments in the log. 
That is not a problem with local storage since the constant factor to traverse 
the files is small enough, but that is not the case with a remote storage which 
exhibits higher read latency. An aggravating factor was the lock contention in 
the remote index cache which was then mitigated by KAFKA-15084. But 
unfortunately, despite the improvements observed without said contention, the 
algorithmic complexity of the current method used to compute uncommitted 
records can always defeat any optimisation made on the remote read path.

Maybe we could start thinking (if not already) about a different construct 
which would reduce that complexity to O(1) - i.e. to make the computation 
independent from the number of segments and irrespective of the transaction 
spans.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: added compatibility for MinGW [kafka]

2023-10-24 Thread via GitHub


maniekes commented on PR #13321:
URL: https://github.com/apache/kafka/pull/13321#issuecomment-1777659808

   @divijvaidya looks like it's stale PR :) shall we merge it or close?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException

2023-10-24 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15600:
-
Summary: KIP-990: Capability to PAUSE Tasks on DeserializationException  
(was: KIP-990: Capability to SUSPEND Tasks on DeserializationException)

> KIP-990: Capability to PAUSE Tasks on DeserializationException
> --
>
> Key: KAFKA-15600
> URL: https://issues.apache.org/jira/browse/KAFKA-15600
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Priority: Minor
>  Labels: kip
>
> Presently, Kafka Streams provides users with two options for handling a 
> {{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
> interface:
>  # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
> will either cause the whole application instance to exit, or the stream 
> thread will be replaced and restarted. Either way, the failed {{Task}} will 
> end up being resumed, either by the current instance or after being 
> rebalanced to another, causing a cascading failure until a user intervenes to 
> address the problem.
>  # {{CONTINUE}} - discard the record and continue processing with the next 
> record. This can cause data loss if the record triggering the 
> {{DeserializationException}} should be considered a valid record. This can 
> happen if an upstream producer changes the record schema in a way that is 
> incompatible with the streams application, or if there is a bug in the 
> {{Deserializer}}  (for example, failing to handle a valid edge-case).
> The user can currently choose between data loss, or a cascading failure that 
> usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException

2023-10-24 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford reassigned KAFKA-15600:


Assignee: Nicholas Telford

> KIP-990: Capability to PAUSE Tasks on DeserializationException
> --
>
> Key: KAFKA-15600
> URL: https://issues.apache.org/jira/browse/KAFKA-15600
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: kip
>
> Presently, Kafka Streams provides users with two options for handling a 
> {{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
> interface:
>  # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
> will either cause the whole application instance to exit, or the stream 
> thread will be replaced and restarted. Either way, the failed {{Task}} will 
> end up being resumed, either by the current instance or after being 
> rebalanced to another, causing a cascading failure until a user intervenes to 
> address the problem.
>  # {{CONTINUE}} - discard the record and continue processing with the next 
> record. This can cause data loss if the record triggering the 
> {{DeserializationException}} should be considered a valid record. This can 
> happen if an upstream producer changes the record schema in a way that is 
> incompatible with the streams application, or if there is a bug in the 
> {{Deserializer}}  (for example, failing to handle a valid edge-case).
> The user can currently choose between data loss, or a cascading failure that 
> usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15508) Method always return the same value ApplicationEventProcessor.java

2023-10-24 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15508:
---
Priority: Minor  (was: Blocker)

> Method always return the same value ApplicationEventProcessor.java
> --
>
> Key: KAFKA-15508
> URL: https://issues.apache.org/jira/browse/KAFKA-15508
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.5.1
>Reporter: Svyatoslav
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> I'm not sure but i think that this is a bug, because method 'process' in 
> ApplicationEventProcessor.java always return true:
>     private {color:#FF}boolean process{color}(final PollApplicationEvent 
> event) {
>         Optional commitRequestManger = 
> registry.get(RequestManager.Type.COMMIT);
>         if (!commitRequestManger.isPresent()) {
>            {color:#FF} return true{color};
>         }
>         CommitRequestManager manager = (CommitRequestManager) 
> commitRequestManger.get();
>         manager.updateAutoCommitTimer(event.pollTimeMs);
>         {color:#FF}return true{color};
>     }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15508) Method always return the same value ApplicationEventProcessor.java

2023-10-24 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779175#comment-17779175
 ] 

Philip Nee commented on KAFKA-15508:


This is not a blocker right?

> Method always return the same value ApplicationEventProcessor.java
> --
>
> Key: KAFKA-15508
> URL: https://issues.apache.org/jira/browse/KAFKA-15508
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.5.1
>Reporter: Svyatoslav
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> I'm not sure but i think that this is a bug, because method 'process' in 
> ApplicationEventProcessor.java always return true:
>     private {color:#FF}boolean process{color}(final PollApplicationEvent 
> event) {
>         Optional commitRequestManger = 
> registry.get(RequestManager.Type.COMMIT);
>         if (!commitRequestManger.isPresent()) {
>            {color:#FF} return true{color};
>         }
>         CommitRequestManager manager = (CommitRequestManager) 
> commitRequestManger.get();
>         manager.updateAutoCommitTimer(event.pollTimeMs);
>         {color:#FF}return true{color};
>     }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on PR #14432:
URL: https://github.com/apache/kafka/pull/14432#issuecomment-1777654947

   Thanks @dajac , I have addressed all the comments. Have added a couple of 
more questions. Thanks for all the pointers, the changes look cleaner than the 
previous iteration (to me atleast).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370542261


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -1908,6 +1909,715 @@ public void testLeavingMemberBumpsGroupEpoch() {
 assertRecordsEquals(expectedRecords, result.records());
 }
 
+@Test
+public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+String groupId = "fooup";
+// Use a static member id as it makes the test easier.
+String memberId1 = Uuid.randomUuid().toString();
+String memberId2 = Uuid.randomUuid().toString();
+String memberId3 = Uuid.randomUuid().toString();
+
+Uuid fooTopicId = Uuid.randomUuid();
+String fooTopicName = "foo";
+Uuid barTopicId = Uuid.randomUuid();
+String barTopicName = "bar";
+
+MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+// Consumer group with two static members.
+GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+.withAssignors(Collections.singletonList(assignor))
+.withMetadataImage(new MetadataImageBuilder()
+.addTopic(fooTopicId, fooTopicName, 6)
+.addTopic(barTopicId, barTopicName, 3)
+.addRacks()
+.build())
+.withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+.withMember(new ConsumerGroupMember.Builder(memberId1)
+.setInstanceId(memberId1)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.build())
+.withMember(new ConsumerGroupMember.Builder(memberId2)
+.setInstanceId(memberId2)
+.setMemberEpoch(10)
+.setPreviousMemberEpoch(9)
+.setTargetMemberEpoch(10)
+.setClientId("client")
+.setClientHost("localhost/127.0.0.1")
+// Use zar only here to ensure that metadata needs to be 
recomputed.
+.setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+.setServerAssignorName("range")
+.setAssignedPartitions(mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.build())
+.withAssignment(memberId1, mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1, 2),
+mkTopicAssignment(barTopicId, 0, 1)))
+.withAssignment(memberId2, mkAssignment(
+mkTopicAssignment(fooTopicId, 3, 4, 5),
+mkTopicAssignment(barTopicId, 2)))
+.withAssignmentEpoch(10))
+.build();
+
+assignor.prepareGroupAssignment(new GroupAssignment(
+new HashMap() {
+{
+put(memberId1, new MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 0, 1),
+mkTopicAssignment(barTopicId, 0)
+)));
+put(memberId2, new MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 2, 3),
+mkTopicAssignment(barTopicId, 1)
+)));
+put(memberId3, new MemberAssignment(mkAssignment(
+mkTopicAssignment(fooTopicId, 4, 5),
+mkTopicAssignment(barTopicId, 2)
+)));
+}
+}
+));
+
+// Member 3 joins the consumer group.
+CoordinatorResult result = 
context.consumerGroupHeartbeat(
+new ConsumerGroupHeartbeatRequestData()
+.setGroupId(groupId)
+.setMemberId(memberId3)
+.setInstanceId(memberId3)
+.setMemberEpoch(0)
+.setRebalanceTimeoutMs(5000)
+.setServerAssignor("range")
+.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+.setTopicPartitions(Collections.emptyList()));
+
+assertResponseEquals(
+new ConsumerGroupHeartbeatResponseData()
+.setMemberId(memberId3)
+.setMemberEpoch(11)
+

Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370539621


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -285,6 +302,11 @@ public ConsumerGroupMember getOrMaybeCreateMember(
 return member;
 }
 
+public ConsumerGroupMember getStaticMember(String instanceId) {

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -259,6 +265,17 @@ public void setTargetAssignmentEpoch(int 
targetAssignmentEpoch) {
 maybeUpdateGroupState();
 }
 
+/**
+ * Get member id of a static member that matches the given group
+ * instance id.
+ *
+ * @param groupInstanceId the group instance id.
+ * @return the static member if it exists.

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370538866


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   Also, regarding 
   
   > Then we only need to recompute the assignment if there is a group epoch 
bump.
   
   Given that now we will have a group epoch bump whenever a static member 
re-joins with a different subscription, should this be mentioned in the KIP? As 
we noticed, this is a deviation from how the static member rejoining with a 
different subscription case as of today. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370533114


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -299,6 +321,9 @@ public void updateMember(ConsumerGroupMember newMember) {
 maybeUpdateServerAssignors(oldMember, newMember);
 maybeUpdatePartitionEpoch(oldMember, newMember);
 maybeUpdateGroupState();
+if (newMember.instanceId() != null) {
+staticMembers.put(newMember.instanceId(), newMember.memberId());
+}

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -312,6 +337,9 @@ public void removeMember(String memberId) {
 maybeUpdateServerAssignors(oldMember, null);
 maybeRemovePartitionEpoch(oldMember);
 maybeUpdateGroupState();
+if (oldMember.instanceId() != null) {
+staticMembers.remove(oldMember.instanceId());
+}

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15578: System Tests for running old protocol with new coordinator [kafka]

2023-10-24 Thread via GitHub


rreddy-22 commented on code in PR #14524:
URL: https://github.com/apache/kafka/pull/14524#discussion_r1370533091


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -277,6 +279,22 @@ def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAI
 self.isolated_controller_quorum = None # will define below if necessary
 self.configured_for_zk_migration = False
 
+default_use_new_coordinator = False
+
+# If 'use_new_coordinator' is not explicitly set, determine it based 
on context.
+if use_new_coordinator is None:
+arg_name = 'use_new_coordinator'
+
+# Default to the global setting if no arguments are injected.
+if not context.injected_args:

Review Comment:
   This is what I had before but this was giving a ERROR: `NoneType' object has 
no attribute 'get` on using get when use_new_coordinator isn't in the context 
args



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370532536


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));
+}
+
+private TargetAssignmentBuilder.TargetAssignmentResult 
computeTargetAssignment(
+ConsumerGroup group,
+int groupEpoch,
+ConsumerGroupMember member,
+ConsumerGroupMember updatedMember) {
+String preferredServerAssignor = group.computePreferredServerAssignor(
+member,
+updatedMember
+).orElse(defaultAssignor.name());
+
+String groupId = group.groupId();
+Map subscriptionMetadata = 
group.subscriptionMetadata();
+String memberId = member.memberId();
+try {
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
+new TargetAssignmentBuilder(groupId, groupEpoch, 
assignors.get(preferredServerAssignor))
+.withMembers(group.members())

Review Comment:
   That makes sense and thanks for the explanation. I have now changed the code 
to remove the existing static member and add the new static member. Rest of the 
state would remain as is. 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
  */

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370531600


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());
+// Write a record corresponding to the new member
+records.add(newMemberSubscriptionRecord(groupId, updatedMember));
+TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = 
computeTargetAssignment(group, group.groupEpoch(), member, updatedMember);
+records.addAll(assignmentResult.records());
+records.add(newCurrentAssignmentRecord(groupId, updatedMember));

Review Comment:
   I got around the issue by explicitly setting the rack info in the 
subscription metadata like 
[here](https://github.com/apache/kafka/pull/14432/files#diff-1396a400e5c9f5ccaa44cb20194066eaa919a7e02c558db02005a4a2b67a93b9R2117-R2122).
 I guess so far this wasn't apparent because all the tests expected a group 
epoch bump happening. In this case, we didn't want a group epoch bump and hence 
I could notice the discrepancy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370523927


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));

Review Comment:
   Done.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -983,27 +1052,125 @@ private 
CoordinatorResult consumerGr
 return new CoordinatorResult<>(records, response);
 }
 
+private void replaceStaticMemberInConsumerGroup(
+String groupId,
+List records,
+ConsumerGroup group,
+ConsumerGroupMember member,
+ConsumerGroupMember existingStaticMember,
+ConsumerGroupMember updatedMember
+) {
+// Write tombstones for the departed static member. Follow the same 
order as the one followed when fencing
+// a member
+records.add(newCurrentAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newTargetAssignmentTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+records.add(newMemberSubscriptionTombstoneRecord(group.groupId(), 
existingStaticMember.memberId()));
+// Cancel all the timers of the departed static member.
+cancelConsumerGroupSessionTimeout(group.groupId(), 
existingStaticMember.memberId());
+cancelConsumerGroupRevocationTimeout(group.groupId(), 
existingStaticMember.memberId());

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14516: [1/N] Static Member leave, join, re-join request using ConsumerGroupHeartbeats [kafka]

2023-10-24 Thread via GitHub


vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370523697


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -747,6 +798,11 @@ private void throwIfMemberEpochIsInvalid(
 List 
ownedTopicPartitions
 ) {
 if (receivedMemberEpoch > member.memberEpoch()) {
+// If a static member rejoins, it's previous epoch would be -2. In 
such a
+// case, we don't need to fence the member.
+if (member.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH && 
receivedMemberEpoch == 0) {
+return;
+}

Review Comment:
   Yeah, that makes sense as well. I placed it inside the if condition because 
this condition at hand shows up only when received epoch > current member 
epoch. But it should be ok to have it outside as you said. I have made the 
change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14565:
URL: https://github.com/apache/kafka/pull/14565#discussion_r1370505973


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerRebalanceListenerInvoker.java:
##
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+
+/**
+ * This class encapsulates the invocation of the callback methods defined in 
the {@link ConsumerRebalanceListener}
+ * interface. When consumer group partition assignment changes, these methods 
are invoked. This class wraps those
+ * callback calls with some logging, optional {@link Sensor} updates, etc.
+ */
+class ConsumerRebalanceListenerInvoker {
+
+private final Logger log;
+private final SubscriptionState subscriptions;
+private final Time time;
+private final ConsumerCoordinatorMetrics coordinatorMetrics;
+
+ConsumerRebalanceListenerInvoker(LogContext logContext,
+ SubscriptionState subscriptions,
+ Time time,
+ ConsumerCoordinatorMetrics 
coordinatorMetrics) {
+this.log = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.time = time;
+this.coordinatorMetrics = coordinatorMetrics;
+}
+
+Exception invokePartitionsAssigned(final SortedSet 
assignedPartitions) {
+log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions, ", "));
+
+Optional listener = 
subscriptions.rebalanceListener();
+
+if (listener.isPresent()) {
+try {
+final long startMs = time.milliseconds();
+listener.get().onPartitionsAssigned(assignedPartitions);
+
coordinatorMetrics.assignCallbackSensor.record(time.milliseconds() - startMs);
+} catch (WakeupException | InterruptException e) {
+throw e;
+} catch (Exception e) {
+log.error("User provided listener {} failed on invocation of 
onPartitionsAssigned for partitions {}",
+listener.getClass().getName(), assignedPartitions, e);
+return e;

Review Comment:
   Do we want to wrap it in KafkaException?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when group coordinator bounces

2023-10-24 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15676:
--
Description: 
When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks (for more 
context, see KAFKA-9184). However, this change in state is not relayed to the 
worker's instance of the [IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].

If the group coordinator for a Connect cluster is unavailable for long enough, 
all of the workers in cluster will revoke their assigned connectors and tasks 
and, upon rejoining the group, report that they have been assigned no 
connectors and tasks.

If a worker's member ID is reset before rejoining the group (which can happen 
if, for example, the [maximum poll 
interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
 for the worker is exceeded), the leader of the cluster will not act as if the 
worker had rejoined the group; instead, it will act as if the worker had left 
the group and a new, unrelated worker had joined during the same rebalance. 
This will cause the scheduled rebalance delay to be triggered, and for the 
connectors and tasks previously-assigned to that worker to remain unassigned 
until the delay expires.

  was:
When a Connect worker loses contact with the group coordinator, it voluntarily 
gives up (i.e., stops) its assignment of connectors and tasks. For more 
context, see KAFKA-9184.

 

However, this change in state is not relayed the worker's instance of the 
[IncrementalCooperativeAssignor 
class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
 This has the consequence that, if the group coordinator for a Connect cluster 
is unavailable for long enough, all of the workers in cluster will revoke their 
assigned connectors and tasks, report that they have been assigned no 
connectors and tasks during the next rebalance, and spuriously trigger the 
scheduled rebalance delay (since the leader will assume that all workers should 
still be running the connectors and tasks that it assigned during the last 
rebalance).


> Scheduled rebalance delay for Connect is unnecessarily triggered when group 
> coordinator bounces
> ---
>
> Key: KAFKA-15676
> URL: https://issues.apache.org/jira/browse/KAFKA-15676
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a Connect worker loses contact with the group coordinator, it 
> voluntarily gives up (i.e., stops) its assignment of connectors and tasks 
> (for more context, see KAFKA-9184). However, this change in state is not 
> relayed to the worker's instance of the [IncrementalCooperativeAssignor 
> class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
> If the group coordinator for a Connect cluster is unavailable for long 
> enough, all of the workers in cluster will revoke their assigned connectors 
> and tasks and, upon rejoining the group, report that they have been assigned 
> no connectors and tasks.
> If a worker's member ID is reset before rejoining the group (which can happen 
> if, for example, the [maximum poll 
> interval|https://kafka.apache.org/documentation.html#consumerconfigs_max.poll.interval.ms]
>  for the worker is exceeded), the leader of the cluster will not act as if 
> the worker had rejoined the group; instead, it will act as if the worker had 
> left the group and a new, unrelated worker had joined during the same 
> rebalance. This will cause the scheduled rebalance delay to be triggered, and 
> for the connectors and tasks previously-assigned to that worker to remain 
> unassigned until the delay expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15628: Refactor ConsumerRebalanceListener invocation for reuse [kafka]

2023-10-24 Thread via GitHub


philipnee commented on code in PR #14565:
URL: https://github.com/apache/kafka/pull/14565#discussion_r1370492706


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -976,15 +1053,14 @@ public void subscribe(Collection topics) {
  * Group rebalances only take place during an active call to {@link 
#poll(Duration)}.
  *
  * @param pattern Pattern to subscribe to
- * @param listener Non-null listener instance to get notifications on 
partition assignment/revocation for the
- * subscribed topics
+ * @param listener {@link Optional} listener instance to get notifications 
on partition assignment/revocation
+ * for the subscribed topics
  * @throws IllegalArgumentException If pattern or listener is null
  * @throws IllegalStateException If {@code subscribe()} is called 
previously with topics, or assign is called
  *   previously (without a subsequent call to 
{@link #unsubscribe()}), or if not
  *   configured at-least one partition 
assignment strategy
  */
-@Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+private void subscribe(Pattern pattern, 
Optional listener) {
 maybeThrowInvalidGroupIdException();
 if (pattern == null || pattern.toString().equals(""))

Review Comment:
   it is a bit weird to use toString() instead of `pattern()` even though both 
implementation return the same field, `pattern`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-24 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1370486899


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -401,12 +524,12 @@ private void maybeThrowInvalidGroupIdException() {
 
 @Override
 public Map metrics() {
-throw new KafkaException("method not implemented");
+return Collections.unmodifiableMap(metrics.metrics());
 }
 
 @Override
 public List partitionsFor(String topic) {
-throw new KafkaException("method not implemented");
+return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));

Review Comment:
   Good call out. I don't know that the necessary topic plumbing code was 
written at that time. Would you mind filing a bug to resolve?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15677) Add support for re2/j regex parsing for patterns

2023-10-24 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-15677:
-

 Summary: Add support for re2/j regex parsing for patterns
 Key: KAFKA-15677
 URL: https://issues.apache.org/jira/browse/KAFKA-15677
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal


KIP-714 advice use of re2/j as library for regex patterns but currently Kafka 
do not have support for same and uses java regex library itself.

KIP-848 also requires the use of re2/j library hence this Jira is to track 
patching KIP-714 pattern comparison code with re2/j once it's support is added 
in Kafka i.e. when re2/j libraries are in use in Kafka post KIP-848 work.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15562) Ensure fetch offset and commit offset handler handles both timeout and various error types

2023-10-24 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779161#comment-17779161
 ] 

Philip Nee commented on KAFKA-15562:


Hi [~grigorii] - I'm already in progress working on this issue.  But we have 
many outstanding tasks for this project, would you be interested in 
participating?

 

cc the active contributors to this initiative here: [~kirktrue] [~lianetm] 

> Ensure fetch offset and commit offset handler handles both timeout and 
> various error types
> --
>
> Key: KAFKA-15562
> URL: https://issues.apache.org/jira/browse/KAFKA-15562
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848-e2e, kip-848-preview
>
> Both fetchOffsetRequest and commitOffsetRequest handlers don't have 
> sufficient logic to handle timeout exception.
>  
> CommitOffsetRequest handler also doesn't handle various of server error such 
> as coordinator not found. We need to handle:
> If Exception is non null:
>  - handle RetriableError that respects requestTimeoutMs
>  - handle NonRetriableError
>  
> If the response contains error, ensure to:
>  - mark coordinator unknown if needed
>  - retry if needed
>  - fail the request



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-24 Thread via GitHub


lianetm commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1370452938


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -401,12 +524,12 @@ private void maybeThrowInvalidGroupIdException() {
 
 @Override
 public Map metrics() {
-throw new KafkaException("method not implemented");
+return Collections.unmodifiableMap(metrics.metrics());
 }
 
 @Override
 public List partitionsFor(String topic) {
-throw new KafkaException("method not implemented");
+return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));

Review Comment:
   (I guess it will be all part of the integration with the metadata calls 
right?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15633: Fix overwrite of meta.properties at startup to handle JBOD. [kafka]

2023-10-24 Thread via GitHub


pprovenzano commented on PR #14578:
URL: https://github.com/apache/kafka/pull/14578#issuecomment-1777540934

   > I think we should add a test for this. Perhaps we can add or extend one of 
the existing integration tests?
   
   I'd like to do that as a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when group coordinator bounces

2023-10-24 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-15676:
--
Summary: Scheduled rebalance delay for Connect is unnecessarily triggered 
when group coordinator bounces  (was: Scheduled rebalance delay for Connect is 
unnecessarily triggered when Kafka cluster bounces)

> Scheduled rebalance delay for Connect is unnecessarily triggered when group 
> coordinator bounces
> ---
>
> Key: KAFKA-15676
> URL: https://issues.apache.org/jira/browse/KAFKA-15676
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a Connect worker loses contact with the group coordinator, it 
> voluntarily gives up (i.e., stops) its assignment of connectors and tasks. 
> For more context, see KAFKA-9184.
>  
> However, this change in state is not relayed the worker's instance of the 
> [IncrementalCooperativeAssignor 
> class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
>  This has the consequence that, if the group coordinator for a Connect 
> cluster is unavailable for long enough, all of the workers in cluster will 
> revoke their assigned connectors and tasks, report that they have been 
> assigned no connectors and tasks during the next rebalance, and spuriously 
> trigger the scheduled rebalance delay (since the leader will assume that all 
> workers should still be running the connectors and tasks that it assigned 
> during the last rebalance).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15676) Scheduled rebalance delay for Connect is unnecessarily triggered when Kafka cluster bounces

2023-10-24 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779144#comment-17779144
 ] 

Chris Egerton commented on KAFKA-15676:
---

One possible fix for this is to simply wipe the state for the previous 
assignments stored in a worker's {{IncrementalCooperativeAssignor}} instance 
once it voluntarily revokes its own connectors and tasks, under the assumption 
that it is more likely that either the worker has left the group and a new 
leader has been elected (in which case this state is useless) or all workers in 
the group have voluntarily revoked their assignments (in which case this state 
is also useless) than this worker will be able to regain contact with the group 
coordinator before its membership (and, implicitly, leadership) is revoked.

> Scheduled rebalance delay for Connect is unnecessarily triggered when Kafka 
> cluster bounces
> ---
>
> Key: KAFKA-15676
> URL: https://issues.apache.org/jira/browse/KAFKA-15676
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> When a Connect worker loses contact with the group coordinator, it 
> voluntarily gives up (i.e., stops) its assignment of connectors and tasks. 
> For more context, see KAFKA-9184.
>  
> However, this change in state is not relayed the worker's instance of the 
> [IncrementalCooperativeAssignor 
> class|https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java].
>  This has the consequence that, if the group coordinator for a Connect 
> cluster is unavailable for long enough, all of the workers in cluster will 
> revoke their assigned connectors and tasks, report that they have been 
> assigned no connectors and tasks during the next rebalance, and spuriously 
> trigger the scheduled rebalance delay (since the leader will assume that all 
> workers should still be running the connectors and tasks that it assigned 
> during the last rebalance).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >