Re: [PR] [MINOR] Code Cleanup (Clients Module) [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #16049:
URL: https://github.com/apache/kafka/pull/16049#issuecomment-2128711390

   > I rebased the PR!
   
   I guess other PRs will encounter same failed tests, so please rebase code 
for others too, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Code Cleanup (Clients Module) [kafka]

2024-05-23 Thread via GitHub


sjhajharia commented on PR #16049:
URL: https://github.com/apache/kafka/pull/16049#issuecomment-2128709392

   Thanks for pointing out @chia7712 
   I rebased the PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16826) Integrate Native Kafka Docker Image with github Actions

2024-05-23 Thread Manikumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16826?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-16826.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Integrate Native Kafka Docker Image with github Actions
> ---
>
> Key: KAFKA-16826
> URL: https://issues.apache.org/jira/browse/KAFKA-16826
> Project: Kafka
>  Issue Type: Task
>Reporter: Krishna Agarwal
>Assignee: Krishna Agarwal
>Priority: Major
>  Labels: KIP-974
> Fix For: 3.8.0
>
>
> Integrate the Native Apache Kafka Docker Image with existing github actions
>  # Build and test
>  # Rc release
>  # Promote



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] Code Cleanup (Clients Module) [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #16049:
URL: https://github.com/apache/kafka/pull/16049#issuecomment-2128703246

   @sjhajharia Could you please rebase code to include #16044? It fixes a lot 
of failed tests :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16826: Integrate Native Docker Image with github actions [kafka]

2024-05-23 Thread via GitHub


omkreddy merged PR #16045:
URL: https://github.com/apache/kafka/pull/16045


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16835) Add Support for consumer to read in commit order.

2024-05-23 Thread Manjunath (Jira)
Manjunath created KAFKA-16835:
-

 Summary: Add Support for consumer to read in commit order.
 Key: KAFKA-16835
 URL: https://issues.apache.org/jira/browse/KAFKA-16835
 Project: Kafka
  Issue Type: New Feature
  Components: consumer, offset manager
Reporter: Manjunath


Currently consumer supports offset order to receive messages.There are some 
cases where commit order is very important.For example assume case where 
PostgreSQL-14  randomly streams multiple in-progress large transactions to some 
intermediate client which starts transactional producer instances for multiple 
in-progress transactions,using this producer instances client pushes data to 
kafka. Now consumer should strictly read messages based on commit order.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16826: Integrate Native Docker Image with github actions [kafka]

2024-05-23 Thread via GitHub


kagarwal06 commented on code in PR #16045:
URL: https://github.com/apache/kafka/pull/16045#discussion_r1612932172


##
.github/workflows/docker_promote.yml:
##
@@ -19,10 +19,10 @@ on:
   workflow_dispatch:
 inputs:
   rc_docker_image:
-description: RC docker image that needs to be promoted (Example:- 
apache/kafka:3.6.0-rc0)
+description: RC docker image that needs to be promoted (Example:- 
apache/kafka:3.6.0-rc0, apache/kafka-native:3.6.0-rc0)

Review Comment:
   Updated all the example messages with `3.8.0` version and included `OR`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-23 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1612931258


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+KafkaMetric metric = allMetrics.get(quotaMetricName);
+if (metric != null) {
+LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);
+metric.config(getQuotaMetricConfig(newQuota));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+public boolean isQuotaExceeded() {
+Sensor sensorInstance = sensor();
+try {
+sensorInstance.checkQuotas();
+} catch (QuotaViolationException qve) {
+LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
+sensorInstance.name(), qve.metric().metricName(), qve.value(), 
qve.bound());
+return true;
+}
+return false;
+}
+
+public void record(double value) {
+sensor().record(value, time.milliseconds(), false);

Review Comment:
   In KIP-956, we do not utilize the throttle time provided by the quota 
manager to regulate fetches and copies. For fetch operations, we initially 
verify quota availability before initiating the retrieval of remote data. If 
the quota is unavailable, our priority is to serve partitions requiring local 
data, rather than throttling the client. Therefore, we focus on fulfilling data 
requests for other partitions in the queue, eliminating the need for throttle 
time in fetch operations.
   
   Similarly, when a RLM Task attempts to copy a segment, it first checks if 
the write quota is available. If the quota is not available, the thread waits 
until the quota becomes available. As a result, we do not require throttle time 
for copies either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service,

Re: [PR] [MINOR] Code Cleanup [kafka]

2024-05-23 Thread via GitHub


sjhajharia commented on PR #16021:
URL: https://github.com/apache/kafka/pull/16021#issuecomment-2128693543

   Hey @chia7712 
   I have created 5 sub-PRs to address the cleanup module-by-module. 
   - https://github.com/apache/kafka/pull/16049: Clients Module
   - https://github.com/apache/kafka/pull/16050: Streams Module
   - https://github.com/apache/kafka/pull/16065: Metadata Module
   - https://github.com/apache/kafka/pull/16066: Connect Module
   - https://github.com/apache/kafka/pull/16067: All other modules (as the 
number of changes per module were very less)
   
   Hope that folks can chime in and provide their feedbacks.
   Good Day!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [MINOR] : Code Cleanup - Misc modules [kafka]

2024-05-23 Thread via GitHub


sjhajharia opened a new pull request, #16067:
URL: https://github.com/apache/kafka/pull/16067

   ## What
   Code Cleanup in Misc Modules
   
   ## Changes
   Some common changes include
   
   - Replace the Arrays.asList() with Collections.singletonList() wherever 
possible
   - Cleaning up some if-else blocks
   - Replacing some instances of String.builder() with String operations
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2128669455

   > i really want to keep the network client delegate to just interfacing with 
the network client, so i wonder if it would be a better design to handle the 
metadata error in a separated module. wdyt?
   
   I think there are pros and cons to each approach.
   
   Current Patch:
   Pros:
   - Simple
  - Enhanced code comprehension: Similar to legacy code
   
   Cons:
   - Violates SRP
  - `NetworkClientDelegate` will also have non-interfacing behaviour.
   
   Separated Modules(`PropagateErrorMetadataUpdater` in 
https://github.com/apache/kafka/pull/15961):
   Pros:
   - Adheres to SOLID principles
   
   Cons:
   - Can be more complex
   - Act on the client network layer.
   
   I'm having difficulty determining which aspect should be prioritized.
   Like you said, it made me think about design as a priority.
   
   Our goal is to propagate metadata errors from the background thread to the 
foreground thread at the appropriate time.
   `MetadataUpdater` seems to play a "role" in handling metadata changes, 
making it a suitable candidate for propagating metadata errors.
   `PropagateErrorMetadataUpdater` acts as a proxy for handling metadata errors.
   We've taken an approach that extends metadata error handling without 
modifying `NetworkClient`'s logic.
   
   What do you think?
   
   You can see the approach in https://github.com/apache/kafka/pull/15961 and I 
would appreciate your review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16764: New consumer should throw InvalidTopicException on poll when invalid topic in metadata [kafka]

2024-05-23 Thread via GitHub


appchemist commented on PR #16043:
URL: https://github.com/apache/kafka/pull/16043#issuecomment-2128663846

   Hi @philipnee, Thank you for review!
   
   > do you think we can put the metadata error in the ConsumerNetworkThread 
and have an exception handler to relay the error back to the user via the 
background queue?
   
   Yes I know. However, I would appreciate your advice


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #15863:
URL: https://github.com/apache/kafka/pull/15863#issuecomment-2128658669

   @gaurav-narula this PR adopt your solution 
(https://github.com/apache/kafka/pull/15863#discussion_r1590313031) now, so it 
would be great to have your reviews before merging. thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16574: The metrics of LogCleaner disappear after reconfiguration [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #15863:
URL: https://github.com/apache/kafka/pull/15863#issuecomment-2128654632

   @chiacyu Could you please rebase PR to run QA with newest code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16771 First log directory printed twice when formatting storage [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on code in PR #16010:
URL: https://github.com/apache/kafka/pull/16010#discussion_r1612892755


##
core/src/test/scala/unit/kafka/tools/StorageToolTest.scala:
##
@@ -206,12 +206,20 @@ Found problem:
 
   @Test
   def testFormatSucceedsIfAllDirectoriesAreAvailable(): Unit = {
-val availableDir1 = TestUtils.tempDir()
-val availableDir2 = TestUtils.tempDir()
+val availableDirs = Seq(TestUtils.tempDir(), TestUtils.tempDir(), 
TestUtils.tempDir()).map(dir => dir.toString)
 val stream = new ByteArrayOutputStream()
-assertEquals(0, runFormatCommand(stream, Seq(availableDir1.toString, 
availableDir2.toString)))
-assertTrue(stream.toString().contains("Formatting 
%s".format(availableDir1)))
-assertTrue(stream.toString().contains("Formatting 
%s".format(availableDir2)))
+assertEquals(0, runFormatCommand(stream, availableDirs))

Review Comment:
   Maybe we can decorate it with following style
   ```scala
   val availableDirs = Seq(TestUtils.tempDir(), TestUtils.tempDir(), 
TestUtils.tempDir()).map(dir => dir.toString)
   val stream = new ByteArrayOutputStream()
   assertEquals(0, runFormatCommand(stream, availableDirs))
   val actual = stream.toString().split("\\r?\\n")
   val expected = availableDirs.map("Formatting %s".format(_))
   // The duplicate messages mean some dirs get formated repeatedly
   assertEquals(availableDirs.size, actual.size)
   actual.foreach(line => expected.exists(line.startsWith))
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


artemlivshits commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612863802


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -21,16 +21,17 @@
 
 public enum TestFeatureVersion implements FeatureVersion {
 
-TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),
+// TEST_1 released right before MV 3.7-IVO was released, and it has no 
dependencies
 TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+// TEST_2 released right before MV 3.8-IVO was released, and it depends on 
this metadata version
 TEST_2(2, MetadataVersion.IBP_3_8_IV0, 
Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_8_IV0.featureLevel()));
 
-private short featureLevel;
-private MetadataVersion metadataVersionMapping;
-private Map dependencies;
+private final short featureLevel;
+private final MetadataVersion metadataVersionMapping;
+private final Map dependencies;
 
 public static final String FEATURE_NAME = "test.feature.version";
-public static final TestFeatureVersion PRODUCTION_VERSION = TEST_1;
+public static final TestFeatureVersion LATEST_PRODUCTION = TEST_1;

Review Comment:
   Does it mean that the broker will show the test feature to the clients?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   I think it would be confusing if a user specifies the latest MV version and 
the result would be different from when nothing is specified (implied 
assumption is that nothing is shortcut for latest MV known to the tool).  It 
would also be confusing if by default (nothing is specified) we don't have all 
features set to the latest versions known to the tool.  We can provide guidance 
to new features developers in the comments (and the test feature example) and 
add a unit test that enforces the equivalence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [MINOR] Code Cleanup - Connect Module [kafka]

2024-05-23 Thread via GitHub


sjhajharia opened a new pull request, #16066:
URL: https://github.com/apache/kafka/pull/16066

   ## What
   Code Cleanup in Connect Module
   
   ## Changes
   Some common changes include
   
   - Replace the Arrays.asList() with Collections.singletonList() wherever 
possible
   - Cleaning up some if-else blocks
   - Replacing some instances of String.builder() with String operations
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [MINOR]: Code Cleanup - Metadata module [kafka]

2024-05-23 Thread via GitHub


sjhajharia opened a new pull request, #16065:
URL: https://github.com/apache/kafka/pull/16065

   ## What
   Code Cleanup in metadata Module
   
   ## Changes
   Some common changes include
   
   - Replace the Arrays.asList() with Collections.singletonList() wherever 
possible
   - Cleaning up some if-else blocks
   - Replacing some instances of String.builder() with String operations
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on code in PR #16052:
URL: https://github.com/apache/kafka/pull/16052#discussion_r1612714758


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StickyTaskAssignor implements TaskAssignor {
+private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+private Map configs = new HashMap<>();
+private final boolean mustPreserveActiveTaskAssignment;
+
+public StickyTaskAssignor() {
+this(false);
+}
+
+public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
+}
+
+@Override
+public void configure(final Map configs) {

Review Comment:
   Already discussed but just to keep track: remove this override (and add a 
default to interface method if necessary)



##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import 

Re: [PR] KAFKA-16796: Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on code in PR #16064:
URL: https://github.com/apache/kafka/pull/16064#discussion_r1612751256


##
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##
@@ -651,4 +651,17 @@ object DumpLogSegments {
 def checkArgs(): Unit = CommandLineUtils.checkRequiredArgs(parser, 
options, filesOpt)
 
   }
+
+  private[tools] def newDecoder(className: String, props: 
VerifiableProperties): Decoder[_] = {
+val kclass = 
Class.forName(className).getConstructor(props.getClass).newInstance(props)

Review Comment:
   This way will require all implementations of new decoder have to offer a 
constructor accepting properties. It seems to me that is not what we expected, 
as the props is always empty. Also, we should use Reconfigurable interface for 
consistency



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16796: Introduce new org.apache.kafka.tools.api.Decoder to replace kafka.serializer.Decoder [kafka]

2024-05-23 Thread via GitHub


FrankYang0529 opened a new pull request, #16064:
URL: https://github.com/apache/kafka/pull/16064

   We need a replacement in order to complete 
https://issues.apache.org/jira/browse/KAFKA-14579 in kafak 4.0
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16834) add the reason for the failure of PartitionRegistration#toRecord

2024-05-23 Thread Jianbin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jianbin Chen updated KAFKA-16834:
-
Description: 
Transform it into the following output, which is easier for users to understand 
and identify the cause of the problem.
{code:java}
options.handleLoss("the directory " + (directory == DirectoryId.UNASSIGNED ? 
"unassigned" : "lost")
+ " state of one or more replicas");{code}

  was:
Transform it into the following output, which is easier for users to understand 
and identify the cause of the problem.
{code:java}
options.handleLoss("the directory " + directory + " state of one or more 
replicas");{code}


> add the reason for the failure of PartitionRegistration#toRecord
> 
>
> Key: KAFKA-16834
> URL: https://issues.apache.org/jira/browse/KAFKA-16834
> Project: Kafka
>  Issue Type: Wish
>Affects Versions: 3.7.0
>Reporter: Jianbin Chen
>Priority: Minor
>
> Transform it into the following output, which is easier for users to 
> understand and identify the cause of the problem.
> {code:java}
> options.handleLoss("the directory " + (directory == DirectoryId.UNASSIGNED ? 
> "unassigned" : "lost")
> + " state of one or more replicas");{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16834: add the reason for the failure of PartitionRegistration#toRecord [kafka]

2024-05-23 Thread via GitHub


funky-eyes opened a new pull request, #16063:
URL: https://github.com/apache/kafka/pull/16063

   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16834) add the reason for the failure of PartitionRegistration#toRecord

2024-05-23 Thread Jianbin Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jianbin Chen updated KAFKA-16834:
-
Summary: add the reason for the failure of PartitionRegistration#toRecord  
(was: add PartitionRegistration#toRecord loss info)

> add the reason for the failure of PartitionRegistration#toRecord
> 
>
> Key: KAFKA-16834
> URL: https://issues.apache.org/jira/browse/KAFKA-16834
> Project: Kafka
>  Issue Type: Wish
>Affects Versions: 3.7.0
>Reporter: Jianbin Chen
>Priority: Minor
>
> Transform it into the following output, which is easier for users to 
> understand and identify the cause of the problem.
> {code:java}
> options.handleLoss("the directory " + directory + " state of one or more 
> replicas");{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16834) add PartitionRegistration#toRecord loss info

2024-05-23 Thread Jianbin Chen (Jira)
Jianbin Chen created KAFKA-16834:


 Summary: add PartitionRegistration#toRecord loss info
 Key: KAFKA-16834
 URL: https://issues.apache.org/jira/browse/KAFKA-16834
 Project: Kafka
  Issue Type: Wish
Affects Versions: 3.7.0
Reporter: Jianbin Chen


Transform it into the following output, which is easier for users to understand 
and identify the cause of the problem.
{code:java}
options.handleLoss("the directory " + directory + " state of one or more 
replicas");{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode

2024-05-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16583:
--
Fix Version/s: 3.8.0
   3.7.1

> Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
> ---
>
> Key: KAFKA-16583
> URL: https://issues.apache.org/jira/browse/KAFKA-16583
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0
>Reporter: HanXu
>Assignee: HanXu
>Priority: Blocker
> Fix For: 3.8.0, 3.7.1
>
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> How to reproduce:
> 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
> 2. Create a topic with 1 partition;
> 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
> partition to Broker B;
> 4. Upgrade Broker B to 3.7.0;
> The Broker B will keep log the following error:
> {code:java}
> [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.LoggingFaultHandler)
> org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
> lost because the following could not be represented in metadata version 
> 3.4-IV0: the directory assignment state of one or more replicas
>   at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
>   at 
> org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
>   at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
>   at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
>   at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
>   at 
> org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
>   at 
> org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:840)
> {code}
> Bug:
>  - When reassigning partition, PartitionRegistration#merge will set the new 
> replicas with UNASSIGNED directory;
>  - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
> MIGRATING directory;
> {code:java}
> if (options.metadataVersion().isDirectoryAssignmentSupported()) {
> record.setDirectories(Uuid.toList(directories));
> } else {
> for (Uuid directory : directories) {
> if (!DirectoryId.MIGRATING.equals(directory)) {
> options.handleLoss("the directory assignment state of one 
> or more replicas");
> break;
> }
> }
> }
> {code}
> Solution:
> - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode

2024-05-23 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-16583:
--
Priority: Blocker  (was: Major)

> Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
> ---
>
> Key: KAFKA-16583
> URL: https://issues.apache.org/jira/browse/KAFKA-16583
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0
>Reporter: HanXu
>Assignee: HanXu
>Priority: Blocker
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> How to reproduce:
> 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
> 2. Create a topic with 1 partition;
> 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
> partition to Broker B;
> 4. Upgrade Broker B to 3.7.0;
> The Broker B will keep log the following error:
> {code:java}
> [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.LoggingFaultHandler)
> org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
> lost because the following could not be represented in metadata version 
> 3.4-IV0: the directory assignment state of one or more replicas
>   at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
>   at 
> org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
>   at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
>   at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
>   at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
>   at 
> org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
>   at 
> org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:840)
> {code}
> Bug:
>  - When reassigning partition, PartitionRegistration#merge will set the new 
> replicas with UNASSIGNED directory;
>  - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
> MIGRATING directory;
> {code:java}
> if (options.metadataVersion().isDirectoryAssignmentSupported()) {
> record.setDirectories(Uuid.toList(directories));
> } else {
> for (Uuid directory : directories) {
> if (!DirectoryId.MIGRATING.equals(directory)) {
> options.handleLoss("the directory assignment state of one 
> or more replicas");
> break;
> }
> }
> }
> {code}
> Solution:
> - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16583: fix PartitionRegistration#toRecord directory check under metadata version 3_7_IV2 [kafka]

2024-05-23 Thread via GitHub


showuon commented on PR #15751:
URL: https://github.com/apache/kafka/pull/15751#issuecomment-2128463581

   Thanks for fixing this issue. I'll take a look this week or next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] ConfigRegistry [kafka]

2024-05-23 Thread via GitHub


github-actions[bot] commented on PR #15428:
URL: https://github.com/apache/kafka/pull/15428#issuecomment-2128459494

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16583) Update from 3.4.0 to 3.7.0 image write failed in Kraft mode

2024-05-23 Thread Jianbin Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849145#comment-17849145
 ] 

Jianbin Chen commented on KAFKA-16583:
--

I want to know when this PR can be merged, I am deeply affected by this bug!

> Update from 3.4.0 to 3.7.0 image write failed in Kraft mode
> ---
>
> Key: KAFKA-16583
> URL: https://issues.apache.org/jira/browse/KAFKA-16583
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0
>Reporter: HanXu
>Assignee: HanXu
>Priority: Major
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> How to reproduce:
> 1. Launch a 3.4.0 controller and a 3.4.0 broker(BrokerA) in Kraft mode;
> 2. Create a topic with 1 partition;
> 3. Launch a 3.4.0 broker(Broker B) in Kraft mode and reassign the step 2 
> partition to Broker B;
> 4. Upgrade Broker B to 3.7.0;
> The Broker B will keep log the following error:
> {code:java}
> [2024-04-18 14:46:54,144] ERROR Encountered metadata loading fault: Unhandled 
> error initializing new publishers 
> (org.apache.kafka.server.fault.LoggingFaultHandler)
> org.apache.kafka.image.writer.UnwritableMetadataException: Metadata has been 
> lost because the following could not be represented in metadata version 
> 3.4-IV0: the directory assignment state of one or more replicas
>   at 
> org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
>   at 
> org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
>   at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
>   at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
>   at org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
>   at 
> org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
>   at 
> org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
>   at 
> org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
>   at java.base/java.lang.Thread.run(Thread.java:840)
> {code}
> Bug:
>  - When reassigning partition, PartitionRegistration#merge will set the new 
> replicas with UNASSIGNED directory;
>  - But in metadata version 3.4.0 PartitionRegistration#toRecord only allows 
> MIGRATING directory;
> {code:java}
> if (options.metadataVersion().isDirectoryAssignmentSupported()) {
> record.setDirectories(Uuid.toList(directories));
> } else {
> for (Uuid directory : directories) {
> if (!DirectoryId.MIGRATING.equals(directory)) {
> options.handleLoss("the directory assignment state of one 
> or more replicas");
> break;
> }
> }
> }
> {code}
> Solution:
> - PartitionRegistration#toRecord allows both MIGRATING and UNASSIGNED



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16833: Fixing PartitionInfo and Cluster equals and hashCode [kafka]

2024-05-23 Thread via GitHub


ahuang98 opened a new pull request, #16062:
URL: https://github.com/apache/kafka/pull/16062

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-23 Thread via GitHub


jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612459168


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -202,6 +204,225 @@ public Object valueFrom(Map map) {
 return current.get(lastStep());
 }
 
+/**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound  function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Map updateValueFrom(
+Map originalValue,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+}
+
+@SuppressWarnings("unchecked")
+private Map updateValue(
+Map originalValue,
+int step,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+if (originalValue == null) return null;
+Map updatedParent = new 
HashMap<>(originalValue.size());
+boolean found = false;
+for (Map.Entry entry : originalValue.entrySet()) {
+String fieldName = entry.getKey();
+Object fieldValue = entry.getValue();
+if (steps.get(step).equals(fieldName)) {
+found = true;
+if (step < lastStepIndex()) {
+if (fieldValue instanceof Map) {
+Map updatedField = updateValue(
+(Map) fieldValue,
+step + 1,
+whenFound,
+whenNotFound,
+whenOther);
+updatedParent.put(fieldName, updatedField);
+} else {
+// add back to not found and apply others, as only 
leaf values are updated
+found = false;
+whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+}
+} else {
+whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+}
+} else {
+whenOther.apply(originalValue, updatedParent, null, fieldName);
+}
+}
+
+if (!found) {
+whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+}
+
+return updatedParent;
+}
+
+/**
+ * Access {@code Struct} fields and apply functions to update field values.
+ *
+ * @param originalSchema original struct schema
+ * @param originalValue  schema-based data value
+ * @param updatedSchema  updated struct schema
+ * @param whenFound  function to apply when path is found
+ * @param whenNotFound   function to apply when path is not found
+ * @param whenOther  function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Struct updateValueFrom(
+Schema originalSchema,
+Struct originalValue,
+Schema updatedSchema,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+return updateValue(originalSchema, originalValue, updatedSchema, 0, 
whenFound, whenNotFound, whenOther);
+}
+
+private Struct updateValue(
+Schema originalSchema,
+Struct originalValue,
+Schema updateSchema,
+int step,
+StructValueUpdater whenFound,
+StructValueUpdater whenNotFound,
+StructValueUpdater whenOther
+) {
+Struct updated = new Struct(updateSchema);
+boolean found = false;
+for (Field field : originalSchema.fields()) {
+if (step < steps.size()) {
+if (steps.get(step).equals(field.name())) {
+found = true;
+if (step == lastStepIndex()) {
+whenFound.apply(
+originalValue,
+field,
+updated,
+updateSchema.field(field.name()),
+this
+);
+} else {
+if (field.schema().type() == Schema.Type.STRUCT) {
+Struct fieldValue = updateValue(
+field.schema(),
+originalValue.getStruct(field.name()),
+updateSchema.field(field.name()).schema(),
+step + 1,
+

Re: [PR] KAFKA-16691: [connect:transform] Support nested field paths on TimestampConverter [kafka]

2024-05-23 Thread via GitHub


jeqo commented on code in PR #15893:
URL: https://github.com/apache/kafka/pull/15893#discussion_r1612458345


##
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/field/SingleFieldPath.java:
##
@@ -202,6 +204,225 @@ public Object valueFrom(Map map) {
 return current.get(lastStep());
 }
 
+/**
+ * Access {@code Map} fields and apply functions to update field values.
+ *
+ * @param originalValue schema-based data value
+ * @param whenFound function to apply when path is found
+ * @param whenNotFound  function to apply when path is not found
+ * @param whenOther function to apply on fields not matched by path
+ * @return updated data value
+ */
+public Map updateValueFrom(
+Map originalValue,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+return updateValue(originalValue, 0, whenFound, whenNotFound, 
whenOther);
+}
+
+@SuppressWarnings("unchecked")
+private Map updateValue(
+Map originalValue,
+int step,
+MapValueUpdater whenFound,
+MapValueUpdater whenNotFound,
+MapValueUpdater whenOther
+) {
+if (originalValue == null) return null;
+Map updatedParent = new 
HashMap<>(originalValue.size());
+boolean found = false;
+for (Map.Entry entry : originalValue.entrySet()) {
+String fieldName = entry.getKey();
+Object fieldValue = entry.getValue();
+if (steps.get(step).equals(fieldName)) {
+found = true;
+if (step < lastStepIndex()) {
+if (fieldValue instanceof Map) {
+Map updatedField = updateValue(
+(Map) fieldValue,
+step + 1,
+whenFound,
+whenNotFound,
+whenOther);
+updatedParent.put(fieldName, updatedField);
+} else {
+// add back to not found and apply others, as only 
leaf values are updated
+found = false;
+whenOther.apply(originalValue, updatedParent, null, 
fieldName);
+}
+} else {
+whenFound.apply(originalValue, updatedParent, this, 
fieldName);
+}
+} else {
+whenOther.apply(originalValue, updatedParent, null, fieldName);
+}
+}
+
+if (!found) {
+whenNotFound.apply(originalValue, updatedParent, this, 
steps.get(step));
+}
+
+return updatedParent;
+}

Review Comment:
   Thanks! I like the proposed approach. The only change I'm considering to add 
is to return an intact map if value is not found (the proposal is to return 
null).
   This should make the behavior consistent with update Struct and Schema that 
do not make this distinction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] return to commit f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a [kafka]

2024-05-23 Thread via GitHub


cmccabe opened a new pull request, #16061:
URL: https://github.com/apache/kafka/pull/16061

   commit db118aba6fbddfc607c6ef653f9965a5a73c778a (HEAD -> return-to-monkey, 
cmccabe/return-to-monkey, trunk)
   Author: Colin P. McCabe 
   Date:   Thu May 23 16:11:24 2024 -0700
   
   return to commit f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a
   
   Return to commit f9db4fa19cce975a6bbaeb09fbe9c91b81846b5a to see if it
   builds more normally than current trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128174644

   > > Something is wrong with the build of this commit
   >
   > There still seems to be some issue with builds 🤔
   
   > [2024-05-23T21:45:16.078Z] Could not stop 
org.gradle.internal.actor.internal.DefaultActorFactory$NonBlockingActor@1d0918ec.
   
   That was probably me. Marking the PR as ready-for-review forcefully killed 
the previous build, and the next build is still waiting in the queue. I think 
that the status message should say "queued", and it just wasn't prepared for 
this strange state. If it doesn't resolve in a few hours then we might want to 
be worried :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16541 Fix potential leader-epoch checkpoint file corruption [kafka]

2024-05-23 Thread via GitHub


junrao commented on code in PR #15993:
URL: https://github.com/apache/kafka/pull/15993#discussion_r1612451041


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -55,16 +61,40 @@ public class LeaderEpochFileCache {
 /**
  * @param topicPartition the associated topic partition
  * @param checkpoint the checkpoint file
+ * @param scheduler  the scheduler to use for async I/O operations
  */
 @SuppressWarnings("this-escape")
-public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint) {
+public LeaderEpochFileCache(TopicPartition topicPartition, 
LeaderEpochCheckpoint checkpoint, Scheduler scheduler) {
 this.checkpoint = checkpoint;
 this.topicPartition = topicPartition;
+this.scheduler = scheduler;
 LogContext logContext = new LogContext("[LeaderEpochCache " + 
topicPartition + "] ");
 log = logContext.logger(LeaderEpochFileCache.class);
 checkpoint.read().forEach(this::assign);
 }
 
+/**
+ * Instantiate a new LeaderEpochFileCache with provided epoch entries 
instead of from the backing checkpoint file.
+ * The provided epoch entries are expected to no less fresh than the 
checkpoint file.

Review Comment:
   to no less fresh => to be no less fresh



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16833) Cluster missing topicIds from equals and hashCode, PartitionInfo missing equals and hashCode

2024-05-23 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang updated KAFKA-16833:
-
Summary: Cluster missing topicIds from equals and hashCode, PartitionInfo 
missing equals and hashCode  (was: PartitionInfo missing equals and hashCode 
methods )

> Cluster missing topicIds from equals and hashCode, PartitionInfo missing 
> equals and hashCode
> 
>
> Key: KAFKA-16833
> URL: https://issues.apache.org/jira/browse/KAFKA-16833
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alyssa Huang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


jolshan commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128160293

   Any ideas about 
   ```
   
   [2024-05-23T21:45:16.078Z] > Task :connect:runtime:test
   
   [2024-05-23T21:45:16.078Z] Could not stop 
org.gradle.internal.actor.internal.DefaultActorFactory$NonBlockingActor@1d0918ec.
   
   [2024-05-23T21:45:16.078Z] org.gradle.internal.dispatch.DispatchException: 
Could not dispatch message [MethodInvocation method: stop()].
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


jolshan commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128158835

   > Something is wrong with the build of this commit
   There still seems to be some issue with builds 🤔 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612404062


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   Ok sorry to be a little all over the place here. I think we should have 
semantics where on each new feature version released to production, we have a 
MV released to production. 
   
   In this case, whether we specify latest production (as it did before) or use 
an empty optional to specify the latest production feature SHOULD be 
equivalent. The only case it is not is if someone improperly doesn't set the 
metadataMapping correctly. There isn't a great way to enforce that (I can do so 
via a test), but I guess the question is whether we prefer the empty approach 
that ensures the latest features are provided OR if we prefer the latest 
production metadata approach that is simpler but may risk not picking up 
features if folks implement the method incorrectly/don't update the MV.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on code in PR #16052:
URL: https://github.com/apache/kafka/pull/16052#discussion_r1612405084


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StickyTaskAssignor implements TaskAssignor {
+private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+private Map configs = new HashMap<>();
+private final boolean mustPreserveActiveTaskAssignment;
+
+public StickyTaskAssignor() {
+this(false);
+}
+
+public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
+}
+
+@Override
+public void configure(final Map configs) {
+// TODO: The application state already has the assignment configs 
object, why should this
+//   assignor be configurable?
+this.configs = configs;
+}
+
+@Override
+public TaskAssignment assign(final ApplicationState applicationState) {
+final int taskCount = applicationState.allTasks().size();
+if (taskCount == 0) {

Review Comment:
   (I filed a ticket for this already btw)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 11) Implemented StickyTaskAssignor [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on code in PR #16052:
URL: https://github.com/apache/kafka/pull/16052#discussion_r1612403956


##
streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java:
##
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.assignment.assignors;
+
+import static java.util.Collections.unmodifiableMap;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
+import 
org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils;
+import org.apache.kafka.streams.processor.assignment.TaskAssignor;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StickyTaskAssignor implements TaskAssignor {
+private static final Logger LOG = 
LoggerFactory.getLogger(StickyTaskAssignor.class);
+
+private Map configs = new HashMap<>();
+private final boolean mustPreserveActiveTaskAssignment;
+
+public StickyTaskAssignor() {
+this(false);
+}
+
+public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
+this.mustPreserveActiveTaskAssignment = 
mustPreserveActiveTaskAssignment;
+}
+
+@Override
+public void configure(final Map configs) {
+// TODO: The application state already has the assignment configs 
object, why should this
+//   assignor be configurable?
+this.configs = configs;
+}
+
+@Override
+public TaskAssignment assign(final ApplicationState applicationState) {
+final int taskCount = applicationState.allTasks().size();
+if (taskCount == 0) {

Review Comment:
   technically this would be a bug, that should be caught prior to ever 
invoking the assignor, so I don't think we should/need to check this
   
   Also: I would say that if we did want to return an empty assignment, it 
should be a real TaskAssignment with a KafkaStreamsAssignment for each 
KafkaStreamsState that appears in the ApplicationState, but with those just 
being empty KafkaStreamsAssignment objects (ie no actual tasks assigned).
   
   I actually think it should be considered an error if the returned 
`TaskAssignment` does not contain a KafkaStreamsAssignment for each 
KafkaStreamsState in the input. When we get around to implementing the 
assignment validation and `#onAssignmentComputed` callback with the error 
codes, we should probably have a dedicated error code for this case (if we 
don't already, don't remember exactly which error codes we had in the original 
KIP)
   
   Doesn't matter for now since imo we should remove this check altogether, but 
good to keep in mind



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15541: Add oldest-iterator-open-since-ms metric [kafka]

2024-05-23 Thread via GitHub


mjsax commented on code in PR #16041:
URL: https://github.com/apache/kafka/pull/16041#discussion_r1612395523


##
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java:
##
@@ -451,6 +455,23 @@ public static void addNumOpenIteratorsGauge(final String 
taskId,
 
 }
 
+public static void addOldestOpenIteratorGauge(final String taskId,
+  final String storeType,
+  final String storeName,
+  final StreamsMetricsImpl 
streamsMetrics,
+  final Gauge 
oldestOpenIteratorGauge) {
+streamsMetrics.addStoreLevelMutableMetric(
+taskId,
+storeType,
+storeName,
+OLDEST_ITERATOR_OPEN_SINCE_MS,
+OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION,
+RecordingLevel.INFO,
+oldestOpenIteratorGauge
+);
+

Review Comment:
   nit: remove blank line



##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##
@@ -169,6 +172,10 @@ private void registerMetrics() {
 iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
 StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
 (config, now) -> numOpenIterators.get());
+StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
+(config, now) -> openIterators.isEmpty() ? null :
+
openIterators.stream().mapToLong(MeteredIterator::startTimestamp).min().getAsLong()

Review Comment:
   I don't want to over-engineer (given that we can safely assume that the 
`openIterator` set should be small), but wondering if this is the best 
implementation?
   
   In the end, we only want to track the create ts, not the iterators 
themselves. And for create ts we could just maintain a list if longs, and we 
would `return list.first()` here, and always append to the end of the list when 
a new iterator is created? Only "remove" would be more expensive, but we could 
use a sorted tree for the list, and thus remove would be O(log n) not O(n)).
   
   For this case, we also don't need the `MeteredIterator` helper interface.
   
   Thoughts?



##
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java:
##
@@ -490,6 +490,31 @@ public void shouldTimeIteratorDuration() {
 assertThat((double) iteratorDurationMaxMetric.metricValue(), 
equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
 }
 
+@Test
+public void shouldTrackOldestOpenIteratorTimestamp() {
+when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+init();
+
+final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
+assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+
+try (final KeyValueIterator first = metered.all()) {
+final long oldestTimestamp = mockTime.milliseconds();
+assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
+mockTime.sleep(100);
+
+// open a second iterator before closing the first to test that we 
still produce the first iterator's timestamp
+try (final KeyValueIterator second = 
metered.all()) {
+assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
+mockTime.sleep(100);
+}

Review Comment:
   It would be better to not close the second iterator here, but close the 
first one first, to see if the metric advances to the second's iterator create 
ts -- would need some rewriting of the test; try-with-resource won't allow for 
proper nesting, but we can still use try-finally.
   
   Might actually be best, to open like 5 iterators and close them in some 
non-linear order (including closing the oldest one like 2 or 3 times) to verify 
correct behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16833) PartitionInfo missing equals and hashCode methods

2024-05-23 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16833:


 Summary: PartitionInfo missing equals and hashCode methods 
 Key: KAFKA-16833
 URL: https://issues.apache.org/jira/browse/KAFKA-16833
 Project: Kafka
  Issue Type: Bug
Reporter: Alyssa Huang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15045: Fix RackAwareTaskAssignorTest [kafka]

2024-05-23 Thread via GitHub


apourchet closed pull request #16056: KAFKA-15045: Fix RackAwareTaskAssignorTest
URL: https://github.com/apache/kafka/pull/16056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on code in PR #16051:
URL: https://github.com/apache/kafka/pull/16051#discussion_r1612358604


##
Jenkinsfile:
##
@@ -36,6 +36,12 @@ def doTest(env, target = "test") {
   junit '**/build/test-results/**/TEST-*.xml'
 }
 
+def runTestOnDevBranch(env) {
+  if (!isChangeRequest(env)) {
+doTest(env)
+  }
+}

Review Comment:
   I'm hesitant to sign up future release managers for additional verification 
of the 11 and 17 builds that the CI has been doing for previous releases. I 
don't expect any failures to be introduced that affect 11 and 17 but not 8 or 
21, but I would rather find those failures out in CI after merge than during 
the RC generation stage.
   
   And for selfish reasons, I also would like to continue seeing the 11 and 17 
trunk results published so I can do flaky test analysis. With this patch as-is, 
we're keeping the same number of test runs on trunk, but if we removed these 
entirely we would be cutting those in half. I don't pay attention to the PR 
builds for statistics because contributors may have bad patches applied that 
aren't worth investigating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128068313

   Looking at the latest CI run, the 11 build took ~15 minutes, the 17 build 
took 11 minutes, and the 8 and 21 builds are continuing as normal.
   
   In the last 28 days, Kafka has run:
   9380 builds in total
   8514 builds on PRs
   748 builds on trunk
   111 builds on 3.7
   7 builds on 3.6
   -> 90% of the builds are on PRs
   
   I think we can expect more than a 40% reduction in our total CI load after 
this change, and I don't think we're losing any test coverage. I'm comfortable 
with this change as-is, and we can pursue further reductions as they become 
desired.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


ijuma commented on code in PR #16051:
URL: https://github.com/apache/kafka/pull/16051#discussion_r1612342902


##
Jenkinsfile:
##
@@ -36,6 +36,12 @@ def doTest(env, target = "test") {
   junit '**/build/test-results/**/TEST-*.xml'
 }
 
+def runTestOnDevBranch(env) {
+  if (!isChangeRequest(env)) {
+doTest(env)
+  }
+}

Review Comment:
   Ah, yes - we'll also have to configure the brokers, etc. to use Java 17. In 
any case, I think whatever we have to do there won't be any harder if we just 
delete the Java 11 and 17 builders for now. And it will make the 3.8 builds 
lighter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


ijuma commented on code in PR #16051:
URL: https://github.com/apache/kafka/pull/16051#discussion_r1612341398


##
Jenkinsfile:
##
@@ -36,6 +36,12 @@ def doTest(env, target = "test") {
   junit '**/build/test-results/**/TEST-*.xml'
 }
 
+def runTestOnDevBranch(env) {
+  if (!isChangeRequest(env)) {
+doTest(env)
+  }
+}

Review Comment:
   I don't see the value - once we drop support for Java 8, we will simply 
change the Java 8 builder to Java 11. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on code in PR #16051:
URL: https://github.com/apache/kafka/pull/16051#discussion_r1612327285


##
Jenkinsfile:
##
@@ -36,6 +36,12 @@ def doTest(env, target = "test") {
   junit '**/build/test-results/**/TEST-*.xml'
 }
 
+def runTestOnDevBranch(env) {
+  if (!isChangeRequest(env)) {
+doTest(env)
+  }
+}

Review Comment:
   What do you think about leaving the 11 and 17 branch builders as-is until 
4.0? This will need some rework then to effect the KIPs that are changing the 
minimum version requirements and having different minimums for the different 
modules. We can figure out a strategy for branch & pr builds then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: Fix RackAwareTaskAssignorTest [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #16056:
URL: https://github.com/apache/kafka/pull/16056#issuecomment-2128047643

   @apourchet Thanks for your contribution. The failed test is already fixed by 
#16044


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16828) RackAwareTaskAssignorTest failed

2024-05-23 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16828.

Fix Version/s: 3.8.0
   Resolution: Fixed

> RackAwareTaskAssignorTest failed
> 
>
> Key: KAFKA-16828
> URL: https://issues.apache.org/jira/browse/KAFKA-16828
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> Found in the latest trunk build.
> It fails many tests in `RackAwareTaskAssignorTest` suite.
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15951/7/#showFailuresLink



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16828: RackAwareTaskAssignorTest failed [kafka]

2024-05-23 Thread via GitHub


chia7712 merged PR #16044:
URL: https://github.com/apache/kafka/pull/16044


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612321070


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   If we want to change it back, I will also update the comments in the 
metadataVersionMapping method as it will not be correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15630) Improve documentation of offset.lag.max

2024-05-23 Thread Ganesh Sadanala (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ganesh Sadanala reassigned KAFKA-15630:
---

Assignee: Ganesh Sadanala

> Improve documentation of offset.lag.max
> ---
>
> Key: KAFKA-15630
> URL: https://issues.apache.org/jira/browse/KAFKA-15630
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, mirrormaker
>Reporter: Mickael Maison
>Assignee: Ganesh Sadanala
>Priority: Major
>  Labels: newbie
>
> It would be good to expand on the role of this configuration on offset 
> translation and mention that it can be set to a smaller value, or even 0, to 
> help in scenarios when records may not flow constantly.
> The documentation string is here: 
> [https://github.com/apache/kafka/blob/06739d5aa026e7db62ff0bd7da57e079cca35f07/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java#L104]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   We do not. This was changed yesterday and the code was as you say in the 
comment. 
   
   > Do we get the same result by passing in that metadataVersion to 
feature.defaultValue()
   
   If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes. If we want to codify this and require a new MV (used only for mapping a 
default) for every new feature to be created when we mark the feature as 
production ready, I can switch it back to how it was yesterday. 
   
   I originally changed it in the case where we want to piggyback on the next 
MV and we may mark the feature as production ready but not the MV.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612319940


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   We do not. This was changed yesterday and the code was as you say in the 
comment. Here is the behavior as it stands now:
   
   > Do we get the same result by passing in that metadataVersion to 
feature.defaultValue()
   
   If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes. If we want to codify this and require a new MV (used only for mapping a 
default) for every new feature to be created when we mark the feature as 
production ready, I can switch it back to how it was yesterday. 
   
   
   



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +111,52 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features],
+usesVersionDefault: Boolean): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.
+val metadataVersionForDefault = if (usesVersionDefault) 
Optional.of(metadataVersion) else Optional.empty[MetadataVersion]()

Review Comment:
   We do not. This was changed yesterday and the code was as you say in the 
comment. 
   
   > Do we get the same result by passing in that metadataVersion to 
feature.defaultValue()
   
   If we follow the protocol of creating a new MV for each new feature and 
making them production ready at the same time then the answer to your question 
is yes. If we want to codify this and require a new MV (used only for mapping a 
default) for every new feature to be created when we mark the feature as 
production ready, I can switch it back to how it was yesterday. 
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128037835

   > It's rare that there is an issue with only java 11/17 and not 8 or 21. 
I've most frequently seen issues with scala 2.12 and that's on the java 8 
build. 
   
   I agree, and that was why I picked 11 and 17 to affect here.
   
   > If we still do the compile step, we still need the nodes right?
   
   We would still need the nodes, but instead of 4 nodes for 3 hours each (12 
node-hours) we would need 2 nodes for 3 hours and 2 nodes for ~15 minutes (6.5 
node-hours).
   
   I'm not very familiar with the jenkins pipeline declaration syntax, and I'm 
not sure how to express this change in a way which doesn't generate 11 & 17 
builds for PRs at all, because it seems like it can only evaluate conditions 
like isChangeRequest after the stage is already defined. Maybe someone that is 
more familiar with jenkins can suggest an alternative.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314478


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -141,6 +189,9 @@ object StorageTool extends Logging {
 formatParser.addArgument("--release-version", "-r").
   action(store()).
   help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
+formatParser.addArgument("--feature").
+  help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").

Review Comment:
   We can do so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314269


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +105,51 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features]): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.

Review Comment:
   This was a typo. I will fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: log coordinator event errors [kafka]

2024-05-23 Thread via GitHub


jeffkbkim opened a new pull request, #16060:
URL: https://github.com/apache/kafka/pull/16060

   We don't log any error logs when a coordinator event fails to execute to 
completion. This patch adds some logging
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612314113


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -156,16 +200,27 @@ object StorageTool extends Logging {
 
   def getMetadataVersion(
 namespace: Namespace,
+featureNamesAndLevelsMap: Map[String, java.lang.Short],
 defaultVersionString: Option[String]
   ): MetadataVersion = {
 val defaultValue = defaultVersionString match {
   case Some(versionString) => 
MetadataVersion.fromVersionString(versionString)
   case None => MetadataVersion.LATEST_PRODUCTION
 }
 
-Option(namespace.getString("release_version"))
-  .map(ver => MetadataVersion.fromVersionString(ver))
-  .getOrElse(defaultValue)
+val releaseVersionTag = Option(namespace.getString("release_version"))
+val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
+
+(releaseVersionTag, featureTag) match {
+  case (Some(_), Some(_)) =>
+throw new IllegalArgumentException("Both --release_version and 
--feature were set. Only one of the two flags can be set.")

Review Comment:
   Right. I can update this check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


jolshan commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1612313415


##
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {
-Features.supportedFeatures(
-  java.util.Collections.singletonMap(MetadataVersion.FEATURE_NAME,
+val features = new util.HashMap[String, SupportedVersionRange]()
+  features.put(MetadataVersion.FEATURE_NAME,
 new SupportedVersionRange(
   MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
   if (unstableMetadataVersionsEnabled) {
 MetadataVersion.latestTesting.featureLevel
   } else {
 MetadataVersion.latestProduction.featureLevel
-  }
-)))
+  }))
+  FeatureVersion.PRODUCTION_FEATURES.forEach { feature =>

Review Comment:
   This will be in a followup. I'm working on it in the background. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


jolshan commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2128028213

   > I think it would be worth running the compile steps for these older 
versions (i.e., "doValidation"). As far as I know, those steps only fail if 
there's actually a problem.
   
   It's rare that there is an issue with only java 11/17 and not 8 or 21. I've 
most frequently seen issues with scala 2.12 and that's on the java 8 build. If 
we still do the compile step, we still need the nodes right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16530) Fix high-watermark calculation to not assume the leader is in the voter set

2024-05-23 Thread Alyssa Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849121#comment-17849121
 ] 

Alyssa Huang commented on KAFKA-16530:
--

In the case the leader is removed from the voter set, and tries to update its 
log end offset (`updateLocalState`) because of a new removeNode record for 
instance, it will first update its own ReplicaState (`getOrCreateReplicaState`) 
which will return a _new_ Observer state if its id is no longer in the 
`voterStates` map. The endOffset will be updated, and then we'll consider if 
the high watermark can be updated (`maybeUpdateHighWatermark`). 
When updating the high watermark, we only look at the `voterStates` map, which 
means we won't count the leader's offset as part of the HW calculation. This 
_does_ mean it's possible for the HW to drop though. Here's a scenario:


{code:java}
# Before node 1 removal, voterStates contains Nodes 1, 2, 3
Node 1: Leader, LEO 100
Node 2: Follower, LEO 90 <- HW
Node 3: Follower, LEO 85

# Leader processes removeNode record, voterStates contains Nodes 2, 3
Node 1: Leader, LEO 101
Node 2: Follower, LEO 90
Node 3: Follower, LEO 85 <- new HW{code}

We want to make sure the HW does not decrement in this scenario. Perhaps we 
could revise `maybeUpdateHighWatermark` to continue to factor in the Leader's 
offset into the HW calculation, regardless of if it is in the voter set or not.
e.g.
{code:java}
  private boolean maybeUpdateHighWatermark() {
    // Find the largest offset which is replicated to a majority of replicas 
(the leader counts)
-   List followersByDescendingFetchOffset = 
followersByDescendingFetchOffset();
+   List followersAndLeaderByDescFetchOffset = 
followersAndLeadersByDescFetchOffset();

-   int indexOfHw = voterStates.size() / 2;
+   int indexOfHw = followersByDescendingFetchOffset.size() / 2;
    Optional highWatermarkUpdateOpt = 
followersByDescendingFetchOffset.get(indexOfHw).endOffset;{code}

However, this does not cover the case when a follower is being removed from the 
voter set.

{code:java}
# Before node 2 removal, voterStates contains Nodes 1, 2, 3
Node 1: Leader, LEO 100
Node 2: Follower, LEO 90 <- HW
Node 3: Follower, LEO 85

# Leader processes removeNode record, voterStates contains Nodes 1, 3
Node 1: Leader, LEO 101
Node 2: Follower, LEO 90
Node 3: Follower, LEO 85 <- new HW{code}

> Fix high-watermark calculation to not assume the leader is in the voter set
> ---
>
> Key: KAFKA-16530
> URL: https://issues.apache.org/jira/browse/KAFKA-16530
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: Alyssa Huang
>Priority: Major
> Fix For: 3.8.0
>
>
> When the leader is being removed from the voter set, the leader may not be in 
> the voter set. This means that kraft should not assume that the leader is 
> part of the high-watermark calculation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16308 [1/N]: Create FeatureVersion interface and add `--feature` flag and handling to StorageTool [kafka]

2024-05-23 Thread via GitHub


junrao commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1602305075


##
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements 
FeatureVersionUtils.FeatureVersionImpl {
+
+TEST_0(0, MetadataVersion.IBP_3_3_IV0, Collections.emptyMap()),
+TEST_1(1, MetadataVersion.IBP_3_7_IV0, Collections.emptyMap()),
+TEST_2(2, MetadataVersion.IBP_3_8_IV0, Collections.emptyMap());
+
+private short featureLevel;
+private MetadataVersion metadataVersionMapping;
+private Map dependencies;

Review Comment:
   Should we make those instance vals final?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -156,16 +200,27 @@ object StorageTool extends Logging {
 
   def getMetadataVersion(
 namespace: Namespace,
+featureNamesAndLevelsMap: Map[String, java.lang.Short],
 defaultVersionString: Option[String]
   ): MetadataVersion = {
 val defaultValue = defaultVersionString match {
   case Some(versionString) => 
MetadataVersion.fromVersionString(versionString)
   case None => MetadataVersion.LATEST_PRODUCTION
 }
 
-Option(namespace.getString("release_version"))
-  .map(ver => MetadataVersion.fromVersionString(ver))
-  .getOrElse(defaultValue)
+val releaseVersionTag = Option(namespace.getString("release_version"))
+val featureTag = featureNamesAndLevelsMap.get(MetadataVersion.FEATURE_NAME)
+
+(releaseVersionTag, featureTag) match {
+  case (Some(_), Some(_)) =>
+throw new IllegalArgumentException("Both --release_version and 
--feature were set. Only one of the two flags can be set.")

Review Comment:
   We should disallow the case where both --release_version and --feature are 
used, but --feature doesn't include metadata, right?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -141,6 +189,9 @@ object StorageTool extends Logging {
 formatParser.addArgument("--release-version", "-r").
   action(store()).
   help(s"A KRaft release version to use for the initial metadata.version. 
The minimum is ${MetadataVersion.IBP_3_0_IV0}, the default is 
${MetadataVersion.LATEST_PRODUCTION}")
+formatParser.addArgument("--feature").
+  help("A feature upgrade we should perform, in feature=level format. For 
example: `metadata.version=5`.").

Review Comment:
   Should we add a shorthand for --feature like other options?



##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -109,6 +105,51 @@ object StorageTool extends Logging {
 }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+if (!metadataVersion.isKRaftSupported) {
+  throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+}
+if (!metadataVersion.isProduction) {
+  if (config.get.unstableMetadataVersionsEnabled) {
+System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+  } else {
+throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+  }
+}
+  }
+
+  private[tools] def generateFeatureRecords(metadataRecords: 
ArrayBuffer[ApiMessageAndVersion],
+metadataVersion: MetadataVersion,
+specifiedFeatures: Map[String, 
java.lang.Short],
+allFeatures: List[Features]): Unit 
= {
+// If we are using --version-default, the default is based on the metadata 
version.

Review Comment:
   Hmm, does the storage tool support the --version-default option?



##
server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. S

Re: [PR] MINOR: Disable JDK 11 and 17 tests on PRs [kafka]

2024-05-23 Thread via GitHub


ijuma commented on code in PR #16051:
URL: https://github.com/apache/kafka/pull/16051#discussion_r1612302312


##
Jenkinsfile:
##
@@ -36,6 +36,12 @@ def doTest(env, target = "test") {
   junit '**/build/test-results/**/TEST-*.xml'
 }
 
+def runTestOnDevBranch(env) {
+  if (!isChangeRequest(env)) {
+doTest(env)
+  }
+}

Review Comment:
   If they don't run on PRs, I don't think anyone will validate the results for 
the branch builders. We should probably remove Java 11 and 17 altogether now 
that we have ported most tests to run with Java 16 and higher (previously many 
were excluded).
   
   I had suggested this in an internal conversation, so good to see similar 
action here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dongnuo123 commented on code in PR #16057:
URL: https://github.com/apache/kafka/pull/16057#discussion_r1612296899


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -4424,14 +4441,113 @@ private ConsumerGroupMember 
validateConsumerGroupMember(
  * @param contextThe request context.
  * @param requestThe actual LeaveGroup request.
  *
+ * @return The LeaveGroup response and the records to append.
+ */
+public CoordinatorResult 
classicGroupLeave(
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+Group group = groups.get(request.groupId(), Long.MAX_VALUE);
+
+if (group == null) {
+throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
+}
+
+if (group.type() == CLASSIC) {
+return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
+} else {
+return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+}
+}
+
+/**
+ * Handle a classic LeaveGroupRequest to a ConsumerGroup.
+ *
+ * @param group  The ConsumerGroup.
+ * @param contextThe request context.
+ * @param requestThe actual LeaveGroup request.
+ *
+ * @return The LeaveGroup response and the records to append.
+ */
+private CoordinatorResult 
classicGroupLeaveToConsumerGroup(
+ConsumerGroup group,
+RequestContext context,
+LeaveGroupRequestData request
+) throws UnknownMemberIdException, GroupIdNotFoundException {
+List memberResponses = new ArrayList<>();
+List records = new ArrayList<>();
+boolean hasValidLeaveGroupMember = false;
+
+for (MemberIdentity memberIdentity: request.members()) {
+String memberId = memberIdentity.memberId();
+String instanceId = memberIdentity.groupInstanceId();
+String reason = memberIdentity.reason() != null ? 
memberIdentity.reason() : "not provided";
+
+ConsumerGroupMember member;
+try {
+if (instanceId == null) {
+member = group.getOrMaybeCreateMember(memberId, false);
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+group.groupId(), memberId, reason);
+} else {
+member = group.staticMember(instanceId);
+throwIfStaticMemberIsUnknown(member, memberId);
+// The LeaveGroup API allows administrative removal of 
members by GroupInstanceId
+// in which case we expect the MemberId to be undefined.
+if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
+throwIfInstanceIdIsFenced(member, group.groupId(), 
memberId, instanceId);
+}
+throwIfMemberDoesNotUseClassicProtocol(member);
+
+log.info("[Group {}] Static Member {} with instance id {} 
has left group " +
+"through explicit `LeaveGroup` request; client 
reason: {}",
+group.groupId(), memberId, instanceId, reason);
+}
+
+
records.addAll(removeMemberAndMaybeUpdateSubscriptionMetadata(group, member));
+cancelTimers(group.groupId(), member.memberId());

Review Comment:
   This will not be reverted if the replay/persistence fails. Is this something 
we want to address with the snapshottable timer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16516) Fix the controller node provider for broker to control channel

2024-05-23 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe reassigned KAFKA-16516:


Assignee: Colin McCabe  (was: José Armando García Sancio)

> Fix the controller node provider for broker to control channel
> --
>
> Key: KAFKA-16516
> URL: https://issues.apache.org/jira/browse/KAFKA-16516
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: José Armando García Sancio
>Assignee: Colin McCabe
>Priority: Major
> Fix For: 3.8.0
>
>
> The broker to controller channel gets the set of voters directly from the 
> static configuration. This needs to change so that the leader nodes comes 
> from the kraft client/manager.
> The code is in KafkaServer where it construct the RaftControllerNodeProvider.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16831: CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit [kafka]

2024-05-23 Thread via GitHub


jeffkbkim opened a new pull request, #16059:
URL: https://github.com/apache/kafka/pull/16059

   Otherwise, we default the write limit to the min buffer size of 16384 for 
the write limit. This causes the coordinator to threw RecordTooLargeException 
even when it's under the 1MB max batch size limit.
   
   Added unit test that fails without this change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup [kafka]

2024-05-23 Thread via GitHub


dongnuo123 opened a new pull request, #16057:
URL: https://github.com/apache/kafka/pull/16057

   This patch implements the LeaveGroup api to the consumer groups that are in 
the mixed mode.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16832) LeaveGroup API for upgrading ConsumerGroup

2024-05-23 Thread Dongnuo Lyu (Jira)
Dongnuo Lyu created KAFKA-16832:
---

 Summary: LeaveGroup API for upgrading ConsumerGroup
 Key: KAFKA-16832
 URL: https://issues.apache.org/jira/browse/KAFKA-16832
 Project: Kafka
  Issue Type: Sub-task
Reporter: Dongnuo Lyu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16832) LeaveGroup API for upgrading ConsumerGroup

2024-05-23 Thread Dongnuo Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongnuo Lyu reassigned KAFKA-16832:
---

Assignee: Dongnuo Lyu

> LeaveGroup API for upgrading ConsumerGroup
> --
>
> Key: KAFKA-16832
> URL: https://issues.apache.org/jira/browse/KAFKA-16832
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Update dependencies [kafka]

2024-05-23 Thread via GitHub


highbolder closed pull request #16039: Update dependencies
URL: https://github.com/apache/kafka/pull/16039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15045: Fix RackAwareTaskAssignorTest [kafka]

2024-05-23 Thread via GitHub


apourchet opened a new pull request, #16056:
URL: https://github.com/apache/kafka/pull/16056

   Some unnecessary changes to RackAwareTaskAssignor broke the tests, this PR 
reverts those changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-23 Thread via GitHub


gharris1727 merged PR #15469:
URL: https://github.com/apache/kafka/pull/15469


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor Values class to fix checkstyle, add benchmark, optimize exceptions [kafka]

2024-05-23 Thread via GitHub


gharris1727 commented on PR #15469:
URL: https://github.com/apache/kafka/pull/15469#issuecomment-2127953424

   Here's the final performance changes:
   
   Benchmark | Before | Before Error | After | After Error | Speedup
   -- | -- | -- | -- | -- | --
   ValuesBenchmark.testConvertToBoolean | 123.749 | 0.842 | 72.577 | 0.412 | 1.7
   ValuesBenchmark.testConvertToByte | 117.883 | 1.397 | 62.387 | 0.148 | 1.9
   ValuesBenchmark.testConvertToDate | 3700.318 | 160.043 | 3522.214 | 36.075 | 
1.1
   ValuesBenchmark.testConvertToDecimal | 1530.936 | 49.503 | 1485.654 | 11.766 
| 1.0
   ValuesBenchmark.testConvertToDouble | 163.937 | 55.591 | 60.378 | 0.577 | 2.7
   ValuesBenchmark.testConvertToFloat | 204.591 | 109.567 | 57.611 | 0.49 | 3.6
   ValuesBenchmark.testConvertToInteger | 140.586 | 3.809 | 66.496 | 0.371 | 2.1
   ValuesBenchmark.testConvertToList | 1276.364 | 54.037 | 1601.568 | 28.972 | 
0.8
   ValuesBenchmark.testConvertToLong | 132.029 | 3.118 | 76.744 | 1.112 | 1.7
   ValuesBenchmark.testConvertToMap | 1361.082 | 78.59 | 1244.339 | 11.019 | 1.1
   ValuesBenchmark.testConvertToShort | 121.575 | 4.6 | 63.167 | 0.311 | 1.9
   ValuesBenchmark.testConvertToString | 1667.243 | 51.186 | 1580.031 | 11.391 
| 1.1
   ValuesBenchmark.testConvertToStruct | 3.819 | 0.082 | 1.395 | 0.009 | 2.7
   ValuesBenchmark.testConvertToTime | 2864.609 | 163.586 | 2701.721 | 60.677 | 
1.1
   ValuesBenchmark.testConvertToTimestamp | 2789.008 | 30.371 | 2738.573 | 19.6 
| 1.0
   ValuesBenchmark.testInferSchema | 123.196 | 3.292 | 99.336 | 0.867 | 1.2
   ValuesBenchmark.testParseString | 43826.599 | 922.077 | 13429.742 | 133.089 
| 3.3
   
   There's a consistent performance degradation for testConvertToList that 
seems to come from the parseString implementation being slightly less efficient 
for array inputs. I'll follow up on that separately since this PR already has 
too much scope, and the degradation is only slight.
   
   One interesting final observation is that the variance/error for all of the 
tests is lower than the previous implementation. I suspect that this is because 
many of methods now avoid traversing the switch-case in the convertTo 
implementation, which lessens the number of branches and increases the branch 
predictor's success rate.
   
   And here's the final coverage changes:
   
   State | Class | Method | Line
   -- | -- | -- | --
   Before | 100% (4/4) | 81% (40/49) | 78% (464/589)
   After | 100% (6/6) | 93% (77/82) | 84% (565/669)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" [kafka]

2024-05-23 Thread via GitHub


ableegoldman closed pull request #16055: Revert "KAFKA-15045: (KIP-924 pt. 10) 
Topic partition rack annotation simplified  (#16034)"
URL: https://github.com/apache/kafka/pull/16055


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on PR #16055:
URL: https://github.com/apache/kafka/pull/16055#issuecomment-2127922285

   It's actually https://github.com/apache/kafka/pull/15972 that broke this 
test, closing this 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 5) Add rack information to ApplicationState [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on PR #15972:
URL: https://github.com/apache/kafka/pull/15972#issuecomment-2127918475

   Yep, just noticed this. Sorry about that. We're taking a look
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #15883:
URL: https://github.com/apache/kafka/pull/15883#issuecomment-2127912971

   We will use a different approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Rewrite OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance [kafka]

2024-05-23 Thread via GitHub


dajac closed pull request #15883: MINOR: Rewrite 
OptimizedUniformAssignmentBuilder#assignStickyPartitions to improve performance
URL: https://github.com/apache/kafka/pull/15883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #14327:
URL: https://github.com/apache/kafka/pull/14327#issuecomment-2127911871

   We will start this work from scratch. Closing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14517: Implement regex subscriptions [kafka]

2024-05-23 Thread via GitHub


dajac closed pull request #14327: KAFKA-14517: Implement regex subscriptions
URL: https://github.com/apache/kafka/pull/14327


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]

2024-05-23 Thread via GitHub


dajac commented on PR #15286:
URL: https://github.com/apache/kafka/pull/15286#issuecomment-2127911151

   This does not seem necessary any more. Closing it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: KIP-848 Uniform Assignor Bugs [kafka]

2024-05-23 Thread via GitHub


dajac closed pull request #15286: MINOR: KIP-848 Uniform Assignor Bugs
URL: https://github.com/apache/kafka/pull/15286


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Revert "KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified (#16034)" [kafka]

2024-05-23 Thread via GitHub


ableegoldman opened a new pull request, #16055:
URL: https://github.com/apache/kafka/pull/16055

   This reverts commit 93238ae312ef7bea79160e59bb7a06623cc94a1b.
   
   [This PR](https://github.com/apache/kafka/pull/16034) broke the 
RackAwareAssignorTest, so I am reverting it while we figure out the cause.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-23 Thread via GitHub


ableegoldman commented on PR #16034:
URL: https://github.com/apache/kafka/pull/16034#issuecomment-2127905750

   Ah shoot, I was looking at the wrong test results, this is actually breaking 
RackAwareTaskAssignorTest. I'll revert this and we can fix the failing test in 
the resubmitted PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15045: (KIP-924 pt. 10) Topic partition rack annotation simplified [kafka]

2024-05-23 Thread via GitHub


ableegoldman merged PR #16034:
URL: https://github.com/apache/kafka/pull/16034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16831) CoordinatorRuntime should initialize MemoryRecordsBuilder with max batch size write limit

2024-05-23 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16831:


 Summary: CoordinatorRuntime should initialize MemoryRecordsBuilder 
with max batch size write limit
 Key: KAFKA-16831
 URL: https://issues.apache.org/jira/browse/KAFKA-16831
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


Otherwise, we default to the min buffer size of 16384 for the write limit. This 
causes the coordinator to threw RecordTooLargeException even when it's under 
the 1MB max batch size limit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16822: Abstract consumer group to share functionality with share group [kafka]

2024-05-23 Thread via GitHub


apoorvmittal10 commented on PR #16054:
URL: https://github.com/apache/kafka/pull/16054#issuecomment-2127858693

   @dajac @omkreddy @AndrewJSchofield Please review, we can merge post 3.8 
branch cut.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16822: Abstract consumer group to share functionality with share group [kafka]

2024-05-23 Thread via GitHub


apoorvmittal10 opened a new pull request, #16054:
URL: https://github.com/apache/kafka/pull/16054

   Abstracted code for 2 classes `ConsumerGroup` and `ConsumerGroupMember` to 
`AbstractGroup` and `GroupMember` respectively. The new abstract classes are 
created to share common functionality with `ShareGroup` and `ShareGroupMember` 
which are being introduced with KIP-932.
   
   The PR is majorly code refactoring from existing classes to abstract 
classes. Also created a new package called `common` where `MemberState` class 
is moved, in upcoming PRs will move common classes for `Share` and `Consumer` 
Group in `common` package itself. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16452: Don't throw OOORE when converting the offset to metadata [kafka]

2024-05-23 Thread via GitHub


junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1612186225


##
core/src/main/scala/kafka/server/DelayedFetch.scala:
##
@@ -103,7 +103,7 @@ class DelayedFetch(
 // We will not force complete the fetch request if a replica 
should be throttled.
 if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
   return forceComplete()
-  } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+  } else if (fetchOffset.onSameSegment(endOffset) && 
fetchOffset.messageOffset < endOffset.messageOffset) {

Review Comment:
   Hmm, we assume that `fetchOffset.messageOffset > endOffset.messageOffset` is 
the truncated leader case. In that case, we should always call 
`forceComplete()` immediately, right? The current code only calls 
`forceComplete()` when the offset metadata is available, but it's more 
consistent to do that regardless of the availability of the offset metadata. 
Should we do sth like the following?
   
   ```
   if (fetchOffset.messageOffset > endOffset.messageOffset) {
 // Case F, this can happen when the new fetch operation is on 
a truncated leader
 debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
 return forceComplete()
   } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
 if (fetchOffset.onOlderSegment(endOffset)) {
   // Case F, this can happen when the fetch operation is 
falling behind the current segment
   // or the partition has just rolled a new segment
   debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
   // We will not force complete the fetch request if a replica 
should be throttled.
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 return forceComplete()
 } else if (fetchOffset.onSameSegment(endOffset)) {
   // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
   val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
   if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
 accumulatedSize += bytesAvailable
 }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-23 Thread via GitHub


C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1612176138


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##
@@ -1039,21 +1039,42 @@ public static List> 
reverseTransform(String connName,
 return result;
 }
 
-public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List> taskProps) {
+public boolean taskConfigsChanged(
+ClusterConfigState configState,
+String connName,
+List> taskProps,
+int connectorConfigHash
+) {
 int currentNumTasks = configState.taskCount(connName);
 boolean result = false;
 if (taskProps.size() != currentNumTasks) {
 log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
 result = true;
 } else {
-for (int index = 0; index < currentNumTasks; index++) {
+for (int index = 0; index < currentNumTasks && !result; index++) {
 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
 log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
 result = true;
 }
 }
+// Do a final check to see if runtime-controlled properties that 
affect tasks but may
+// not be included in the connector-generated configs for them 
(such as converter overrides)
+// have changed
+if (!result) {
+Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+if (storedConnectorConfigHash == null) {
+log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);

Review Comment:
   I don't love the idea of warn logs here. They'll be emitted unconditionally 
during upgrade even on completely unaffected clusters, and if users follow 
instructions to reconfigure their connectors, it'll have the same disruptive 
effect of forcing task restarts and extra churn. Honestly though, I suspect 
what's more likely is that people will ignore them.
   
   I think the underlying problem we're running into is that tightly coupling 
the storage of config hashes with the storage of task configs is forcing us 
into an uncomfortable spot where we can't publish a new config hash without 
forcing task restarts.
   
   I've published a new PR with a different, hopefully simpler approach 
[here](https://github.com/apache/kafka/pull/16053). Let me know what you think!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-9228: Restart tasks on runtime-only connector config changes [kafka]

2024-05-23 Thread via GitHub


C0urante opened a new pull request, #16053:
URL: https://github.com/apache/kafka/pull/16053

   [Jira](https://issues.apache.org/jira/browse/KAFKA-9228)
   
   This uses a much simpler approach than the one pursued in 
https://github.com/apache/kafka/pull/16001. Once task configs are detected for 
a connector, the most-recent config for that connector is tracked by the herder 
as the "applied" config. When new task configs are generated by the connector, 
the latest config for the connector is compared to the "applied" connector 
config. If a difference is detected, or there is no "applied" config, then 
tasks are automatically published to the config topic.
   
   If this approach is agreeable, I can apply it to standalone mode as well, 
fix the failing unit test cases in `DistributedHerderTest, and possibly flesh 
out some new unit tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16795: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter [kafka]

2024-05-23 Thread via GitHub


chia7712 commented on PR #16020:
URL: https://github.com/apache/kafka/pull/16020#issuecomment-2127829943

   @brandboat Thanks for your contribution. Those supports should be removed 
from 4.0.0. I have filed ticket 
(https://issues.apache.org/jira/browse/KAFKA-16830)  for you. Please feel free 
to reassign yourself if you have no free cycles.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16830) Remove the scala version formatters support

2024-05-23 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16830:
--

 Summary: Remove the scala version formatters support
 Key: KAFKA-16830
 URL: https://issues.apache.org/jira/browse/KAFKA-16830
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Kuan Po Tseng
 Fix For: 4.0.0


[https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java#L72]

 

Those deprecated formatters "strings" should be removed from 4.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16795) Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter

2024-05-23 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16795.

Resolution: Fixed

> Fix broken compatibility in kafka.tools.NoOpMessageFormatter, 
> kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter
> --
>
> Key: KAFKA-16795
> URL: https://issues.apache.org/jira/browse/KAFKA-16795
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
> Fix For: 3.8.0
>
>
> [{{0bf830f}}|https://github.com/apache/kafka/commit/0bf830fc9c3915bc99b6e487e6083dabd593c5d3]
>  moved NoOpMessageFormatter, DefaultMessageFormatter and 
> LoggingMessageFormatter package from {{kafka.tools}} to 
> {{{}org.apache.kafka.tools.consumer{}}}{{{}{}}}
> These classes could be used via cmd kafka-console-consumer.sh. We should have 
> a dependency cycle before 3.8.0 comes out.
>  
> {code:java}
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --topic streams-wordcount-output \
> --from-beginning \
> --formatter kafka.tools.DefaultMessageFormatter \
> --property print.key=true \
> --property print.value=true \
> --property 
> key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
> --property 
> value.deserializer=org.apache.kafka.common.serialization.LongDeserializer{code}
> The goal in this Jira is to allow user to keep using 
> {{{}kafka.tools.NoOpMessageFormatter{}}}, 
> {{{}kafka.tools.DefaultMessageFormatter{}}}, and 
> {{{}kafka.tools.LoggingMessageFormatter{}}}, but we also display warning 
> messages to say those "strings" will be removed in 4.0.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16795: Fix broken compatibility in kafka.tools.NoOpMessageFormatter, kafka.tools.DefaultMessageFormatter, and kafka.tools.LoggingMessageFormatter [kafka]

2024-05-23 Thread via GitHub


chia7712 merged PR #16020:
URL: https://github.com/apache/kafka/pull/16020


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disable JDK 11 and 17 builds on PRs [kafka]

2024-05-23 Thread via GitHub


mumrah commented on PR #16051:
URL: https://github.com/apache/kafka/pull/16051#issuecomment-2127820090

   I think it would be worth running the compile steps for these older versions 
(i.e., "doValidation"). As far as I know, those steps only fail if there's 
actually a problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >