(kafka) 07/12: KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra (#16180)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 69158f67f842da7a166fad5a69bc09e41e9f4e25
Author: Kamal Chandraprakash 
AuthorDate: Wed Jun 5 00:41:30 2024 +0530

KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra 
(#16180)

- Removed the RemoteLogSegmentLifecycleManager
- Removed the TopicBasedRemoteLogMetadataManagerWrapper, 
RemoteLogMetadataCacheWrapper, TopicBasedRemoteLogMetadataManagerHarness and 
TopicBasedRemoteLogMetadataManagerWrapperWithHarness

Reviewers: Kuan-Po (Cooper) Tseng , Chia-Ping Tsai 

---
 .../storage/RemoteLogMetadataManagerTestUtils.java |  23 +-
 .../storage/RemoteLogSegmentLifecycleManager.java  |  63 ---
 .../storage/RemoteLogSegmentLifecycleTest.java | 562 -
 .../TopicBasedRemoteLogMetadataManagerHarness.java |  90 
 ...icBasedRemoteLogMetadataManagerRestartTest.java |   1 -
 ...RemoteLogMetadataManagerWrapperWithHarness.java | 105 
 6 files changed, 201 insertions(+), 643 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index fd35167d02a..3fc0432e773 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -17,17 +17,12 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.Assertions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -37,10 +32,9 @@ import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 public class RemoteLogMetadataManagerTestUtils {
-private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class);
-
 private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3;
 private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2;
 private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 
1000L;
@@ -53,7 +47,6 @@ public class RemoteLogMetadataManagerTestUtils {
 private String bootstrapServers;
 private boolean startConsumerThread;
 private Map overrideRemoteLogMetadataManagerProps = 
Collections.emptyMap();
-private Set topicIdPartitions = 
Collections.emptySet();
 private Supplier 
remotePartitionMetadataStore = RemotePartitionMetadataStore::new;
 private Function 
remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new;
 
@@ -85,11 +78,6 @@ public class RemoteLogMetadataManagerTestUtils {
 return this;
 }
 
-public Builder topicIdPartitions(Set 
topicIdPartitions) {
-this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions);
-return this;
-}
-
 public TopicBasedRemoteLogMetadataManager build() {
 Objects.requireNonNull(bootstrapServers);
 String logDir = 
TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
@@ -105,19 +93,12 @@ public class RemoteLogMetadataManagerTestUtils {
 configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 
METADATA_TOPIC_PARTITIONS_COUNT);
 configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, 
METADATA_TOPIC_REPLICATION_FACTOR);
 configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, 
METADATA_TOPIC_RETENTION_MS);
-
-log.debug("TopicBasedRemoteLogMetadataManager configs before 
adding overridden properties: {}", configs);
 // Add override properties.
 configs.putAll(overrideRemoteLogMetadataManagerProps);
-log.debug("TopicBasedRemoteLogMetadataManager configs after adding 
overridden properties: {}", configs);
 
 topicBasedRemoteLogMetadataMa

(kafka) 06/12: KAFKA-16880 Update equals and hashcode methods for two attributes (#16173)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 944f4699a7de443a72fa2618b5cbc8531f12a8c4
Author: Murali Basani 
AuthorDate: Tue Jun 4 08:05:00 2024 +0200

KAFKA-16880 Update equals and hashcode methods for two attributes  (#16173)

Reviewers: Kamal Chandraprakash , Chia-Ping 
Tsai 
---
 .../log/remote/storage/RemoteLogManagerConfig.java |   5 +-
 .../remote/storage/RemoteLogManagerConfigTest.java | 107 +++--
 2 files changed, 102 insertions(+), 10 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index d6cf615c781..1add933d788 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -585,6 +585,8 @@ public final class RemoteLogManagerConfig {
 return enableRemoteStorageSystem == that.enableRemoteStorageSystem
 && remoteLogIndexFileCacheTotalSizeBytes == 
that.remoteLogIndexFileCacheTotalSizeBytes
 && remoteLogManagerThreadPoolSize == 
that.remoteLogManagerThreadPoolSize
+&& remoteLogManagerCopierThreadPoolSize == 
that.remoteLogManagerCopierThreadPoolSize
+&& remoteLogManagerExpirationThreadPoolSize == 
that.remoteLogManagerExpirationThreadPoolSize
 && remoteLogManagerTaskIntervalMs == 
that.remoteLogManagerTaskIntervalMs
 && remoteLogManagerTaskRetryBackoffMs == 
that.remoteLogManagerTaskRetryBackoffMs
 && remoteLogManagerTaskRetryBackoffMaxMs == 
that.remoteLogManagerTaskRetryBackoffMaxMs
@@ -613,7 +615,8 @@ public final class RemoteLogManagerConfig {
 public int hashCode() {
 return Objects.hash(enableRemoteStorageSystem, 
remoteStorageManagerClassName, remoteStorageManagerClassPath,
 remoteLogMetadataManagerClassName, 
remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
-remoteLogMetadataCustomMetadataMaxBytes, 
remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, 
remoteLogManagerTaskIntervalMs,
+remoteLogMetadataCustomMetadataMaxBytes, 
remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize,
+remoteLogManagerCopierThreadPoolSize, 
remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs,
 remoteLogManagerTaskRetryBackoffMs, 
remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
 remoteLogReaderThreads, 
remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, 
remoteLogMetadataManagerProps,
 remoteStorageManagerPrefix, 
remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond,
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
index 4e3c2fc26cb..d04c42409b2 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.server.log.remote.storage;
 
 import org.apache.kafka.common.config.AbstractConfig;
-import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -27,6 +27,8 @@ import java.util.Map;
 
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 public class RemoteLogManagerConfigTest {
 
@@ -43,13 +45,11 @@ public class RemoteLogManagerConfigTest {
 String rlmmPrefix = "__custom.rlmm.";
 Map rsmProps = Collections.singletonMap("rsm.prop", 
"val");
 Map rlmmProps = Collections.singletonMap("rlmm.prop", 
"val");
-String remoteLogMetadataManagerClass = 
useDefaultRemoteLogMetadataManagerClass ? 
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : 
"dummy.remote.log.metadata.class";
-RemoteLogManagerConfig expectedRemoteLogManagerConfig
-= new RemoteLogManagerConfig(true, 
"dummy.remote.storage.class", "dummy.remote.storage.class.path",
-  

(kafka) 09/12: MINOR: Cleanup the storage module unit tests (#16202)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 025e791d0cdfc025fd064379f8ebb8b9fe380a26
Author: Kamal Chandraprakash 
AuthorDate: Thu Jun 6 21:26:08 2024 +0530

MINOR: Cleanup the storage module unit tests (#16202)

- Use SystemTime instead of MockTime when time is not mocked
- Use static assertions to reduce the line length
- Fold the lines if it exceeds the limit
- rename tp0 to tpId0 when it refers to TopicIdPartition

Reviewers: Kuan-Po (Cooper) Tseng , Chia-Ping Tsai 

---
 .../storage/RemoteLogMetadataFormatterTest.java| 18 ++---
 .../storage/RemoteLogMetadataSerdeTest.java| 38 +-
 .../storage/RemoteLogMetadataTransformTest.java| 40 +--
 ...picBasedRemoteLogMetadataManagerConfigTest.java | 53 +-
 ...ogMetadataManagerMultipleSubscriptionsTest.java | 41 +--
 ...icBasedRemoteLogMetadataManagerRestartTest.java | 25 +++
 .../TopicBasedRemoteLogMetadataManagerTest.java| 56 ---
 .../storage/RemoteLogMetadataManagerTest.java  | 83 --
 8 files changed, 169 insertions(+), 185 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index e3d1a2aee0c..1380a735fba 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -24,7 +24,6 @@ import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteL
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
-import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.junit.jupiter.api.Test;
 
 import java.io.ByteArrayOutputStream;
@@ -35,6 +34,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED;
 
 public class RemoteLogMetadataFormatterTest {
 private static final Uuid TOPIC_ID = Uuid.randomUuid();
@@ -51,12 +51,12 @@ public class RemoteLogMetadataFormatterTest {
 RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, 
SEGMENT_ID);
 Optional customMetadata = Optional.of(new 
CustomMetadata(new byte[10]));
 RemoteLogSegmentMetadata remoteLogMetadata = new 
RemoteLogSegmentMetadata(
-remoteLogSegmentId, 0L, 100L, -1L, 1,
-123L, 1024, customMetadata,
-RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
+remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, 
customMetadata, COPY_SEGMENT_STARTED,
+segLeaderEpochs);
 
 byte[] metadataBytes = new 
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
-ConsumerRecord metadataRecord = new 
ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
+ConsumerRecord metadataRecord = new ConsumerRecord<>(
+"__remote_log_metadata", 0, 0, null, metadataBytes);
 
 String expected = String.format(
 "partition: 0, offset: 0, value: " +
@@ -68,9 +68,11 @@ public class RemoteLogMetadataFormatterTest {
 TOPIC_ID, SEGMENT_ID);
 try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
  PrintStream ps = new PrintStream(baos)) {
-RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new 
RemoteLogMetadataSerde.RemoteLogMetadataFormatter();
-formatter.writeTo(metadataRecord, ps);
-assertEquals(expected, baos.toString());
+try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter =
+ new 
RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) {
+formatter.writeTo(metadataRecord, ps);
+assertEquals(expected, baos.toString());
+}
 }
 }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 5b48790c7fd..b1b91dacf23 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -19,48 +19,48 @@ package

(kafka) 04/12: KAFKA-16859 Cleanup check if tiered storage is enabled (#16153)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f9c37032ffc316de86fee3d8096a53a56451
Author: Ken Huang <100591800+m1a...@users.noreply.github.com>
AuthorDate: Mon Jun 3 12:04:58 2024 +0900

KAFKA-16859 Cleanup check if tiered storage is enabled (#16153)

Reviewers: Chia-Ping Tsai 
---
 .../kafka/server/builders/KafkaApisBuilder.java|  2 +-
 .../server/builders/ReplicaManagerBuilder.java |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala |  2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ConfigHandler.scala|  2 +-
 .../server/ControllerConfigurationValidator.scala  |  3 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +---
 core/src/main/scala/kafka/server/KafkaServer.scala |  2 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  4 ++--
 .../kafka/log/remote/RemoteLogManagerTest.java |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 26 +++---
 .../unit/kafka/server/ReplicaManagerTest.scala |  4 ++--
 12 files changed, 27 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 6ffd741f4fc..1d422461678 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -179,7 +179,7 @@ public class KafkaApisBuilder {
 if (metrics == null) throw new RuntimeException("You must set 
metrics");
 if (quotas == null) throw new RuntimeException("You must set quotas");
 if (fetchManager == null) throw new RuntimeException("You must set 
fetchManager");
-if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled());
+if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
 if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
 return new KafkaApis(requestChannel,
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 82aa75909ab..5e8cf2dcdc6 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -185,7 +185,7 @@ public class ReplicaManagerBuilder {
 if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
 if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
 if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
-if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled());
+if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
 // Initialize metrics in the end just before passing it to 
ReplicaManager to ensure ReplicaManager closes the
 // metrics correctly. There might be a resource leak if it is 
initialized and an exception occurs between
 // its initialization and creation of ReplicaManager.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 8bc6db2dff6..45d6ab7908d 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1562,7 +1562,7 @@ object LogManager {
 keepPartitionMetadataFile: Boolean): LogManager = {
 val defaultProps = config.extractLogConfigMap
 
-LogConfig.validateBrokerLogConfigValues(defaultProps, 
config.isRemoteLogStorageSystemEnabled)
+LogConfig.validateBrokerLogConfigValues(defaultProps, 
config.remoteLogManagerConfig.enableRemoteStorageSystem())
 val defaultLogConfig = new LogConfig(defaultProps)
 
 val cleanerConfig = LogCleaner.cleanerConfig(config)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 94bcf321420..baafc3e658f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -186,7 +186,7 @@ class BrokerServer(
   kafkaScheduler.startup()
 
   /* register broker metrics */
-  brokerTopicStats = new 
BrokerTopicStats(config.isRemoteLogStorageSystemEnabled)
+  brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem())
 
   quotaManagers = QuotaFactory.instantiate(config, metrics, time, 
s"broker-${config.nodeId}-")
 
diff --git a/core/src/ma

(kafka) 12/12: KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d94a28b4a4f94143f51d71e03f639078a2e1
Author: Kamal Chandraprakash 
AuthorDate: Mon Jun 10 20:42:12 2024 +0530

KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically 
(#16203)

Reviewers: Satish Duggana , Luke Chen 

---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  2 ++
 .../main/scala/kafka/server/ReplicaManager.scala   |  3 +-
 .../kafka/server/DynamicBrokerConfigTest.scala | 33 ++
 .../log/remote/storage/RemoteLogManagerConfig.java |  4 ---
 5 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 6fa43b560dd..9b75425666f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -1202,6 +1202,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends 
BrokerReconfigurable w
 
 object DynamicRemoteLogConfig {
   val ReconfigurableConfigs = Set(
-RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
+RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
+RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP
   )
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 06bce64f623..745c8648e38 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1211,6 +1211,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 
   def logLocalRetentionMs: java.lang.Long = 
getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)
 
+  def remoteFetchMaxWaitMs = 
getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP)
+
   validateValues()
 
   @nowarn("cat=deprecation")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 17e9a5c1b9e..6ab67a84ec4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1479,10 +1479,9 @@ class ReplicaManager(val config: KafkaConfig,
 return Some(createLogReadResult(e))
 }
 
-val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
+val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong
 val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
   fetchPartitionStatus, params, logReadResults, this, responseCallback)
-
 delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
 None
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 328d6e41b57..2e5d77cdc6f 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -792,6 +792,39 @@ class DynamicBrokerConfigTest {
 verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100)
   }
 
+  @Test
+  def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = {
+val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port 
= 8181)
+val config = KafkaConfig(props)
+val kafkaBroker = mock(classOf[KafkaBroker])
+when(kafkaBroker.config).thenReturn(config)
+assertEquals(500, config.remoteFetchMaxWaitMs)
+
+val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker)
+config.dynamicConfig.initialize(None, None)
+config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig)
+
+val newProps = new Properties()
+newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "3")
+// update default config
+config.dynamicConfig.validate(newProps, perBrokerConfig = false)
+config.dynamicConfig.updateDefaultConfig(newProps)
+assertEquals(3, config.remoteFetchMaxWaitMs)
+
+// update per broker config
+newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "1")
+config.dynamicConfig.validate(newProps, perBrokerConfig = true)
+config.dynamicConfig.updateBrokerConfig(0, newProps)
+assertEquals(1, config.remoteFetchMaxWaitMs)
+
+// invalid values
+for (maxWaitMs <- Seq(-1, 0)) {
+  newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, 
maxWaitMs.toString)
+  assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
+  assertThrows(classOf[ConfigException], () => 
config.dynamicConfig.validate(newProps, perBrokerConf

(kafka) 01/12: KAFKA-16866 Used the right constant in RemoteLogManagerTest#testFetchQuotaManagerConfig (#16152)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 68d92a5b43f001a06e4c5d9c6f244c6cc33595b1
Author: Kamal Chandraprakash 
AuthorDate: Fri May 31 22:44:31 2024 +0530

KAFKA-16866 Used the right constant in 
RemoteLogManagerTest#testFetchQuotaManagerConfig (#16152)

Reviewers: Chia-Ping Tsai 
---
 core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java | 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 72154c9fc22..0ba5d63a8da 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -128,6 +128,9 @@ import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
@@ -2701,9 +2704,9 @@ public class RemoteLogManagerTest {
 Properties defaultProps = new Properties();
 RemoteLogManagerConfig defaultRlmConfig = 
createRLMConfig(defaultProps);
 RLMQuotaManagerConfig defaultConfig = 
RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig);
-assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, 
defaultConfig.quotaBytesPerSecond());
-assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, 
defaultConfig.numQuotaSamples());
-
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, 
defaultConfig.quotaWindowSizeSeconds());
+assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, 
defaultConfig.quotaBytesPerSecond());
+assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, 
defaultConfig.numQuotaSamples());
+
assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, 
defaultConfig.quotaWindowSizeSeconds());
 
 Properties customProps = new Properties();
 
customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
 100);



(kafka) 03/12: MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest (#16171)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2273e061389c42686b12317f625523b46fd27a2b
Author: Kuan-Po (Cooper) Tseng 
AuthorDate: Mon Jun 3 03:59:56 2024 +0800

MINOR: Fix missing wait topic finished in 
TopicBasedRemoteLogMetadataManagerRestartTest (#16171)

Reviewers: Chia-Ping Tsai 
---
 .../storage/TopicBasedRemoteLogMetadataManagerRestartTest.java  | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index 07b08145731..84b98dcb5be 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -72,8 +72,10 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
 NewTopic newLeaderTopic = new NewTopic(leaderTopic, 
Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
 // Set broker id 1 as the first entry which is taken as the leader.
 NewTopic newFollowerTopic = new NewTopic(followerTopic, 
Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
-admin.createTopics(Arrays.asList(newLeaderTopic, 
newFollowerTopic));
+admin.createTopics(Arrays.asList(newLeaderTopic, 
newFollowerTopic)).all().get();
 }
+clusterInstance.waitForTopic(leaderTopic, 1);
+clusterInstance.waitForTopic(followerTopic, 1);
 
 final TopicIdPartition leaderTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
 final TopicIdPartition followerTopicIdPartition = new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
@@ -122,4 +124,4 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest {
 
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)));
 }
 }
-}
\ No newline at end of file
+}



(kafka) 02/12: KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra (#16170)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c6f0db3c6069fd49564519b1bf26c10e3cd237e6
Author: Kuan-Po (Cooper) Tseng 
AuthorDate: Sun Jun 2 22:18:53 2024 +0800

KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new 
test infra (#16170)

Reviewers: Chia-Ping Tsai 
---
 .../TopicBasedRemoteLogMetadataManagerHarness.java |   7 +-
 ...icBasedRemoteLogMetadataManagerRestartTest.java | 159 -
 2 files changed, 62 insertions(+), 104 deletions(-)

diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index a063fa8820a..7af78e750a8 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -53,12 +53,6 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
 initializeRemoteLogMetadataManager(topicIdPartitions, 
startConsumerThread, RemoteLogMetadataTopicPartitioner::new, 
remotePartitionMetadataStoreSupplier);
 }
 
-public void initializeRemoteLogMetadataManager(Set 
topicIdPartitions,
-   boolean startConsumerThread,
-   Function remoteLogMetadataTopicPartitioner) {
-initializeRemoteLogMetadataManager(topicIdPartitions, 
startConsumerThread, remoteLogMetadataTopicPartitioner, 
RemotePartitionMetadataStore::new);
-}
-
 public void initializeRemoteLogMetadataManager(Set 
topicIdPartitions,
boolean startConsumerThread,
Function remoteLogMetadataTopicPartitioner,
@@ -70,6 +64,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness 
extends IntegrationTestHa
   .startConsumerThread(startConsumerThread)
   .remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner)
   .remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier)
+  
.overrideRemoteLogMetadataManagerProps(overrideRemoteLogMetadataManagerProps())
   .build();
 }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
index c599259ed94..07b08145731 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -16,6 +16,11 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -24,139 +29,97 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.test.TestUtils;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.extension.ExtendWith;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 
-@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
+@ExtendWith(value = ClusterTestExtensions.class)
+@Tag("integration")
 public class TopicBasedRemoteLogMetadataManagerRestartTest {
 
 private static final int SEG_SIZE = 1024 * 1024;
 
 private final Time time = new MockTime(1);
 private final String logDir = 
TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
+private final ClusterInstance clusterInstance;
 
-private TopicBasedRemoteLogMetadataManager

(kafka) branch 3.8 updated (facbab272fb -> d94a28b4a4f)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a change to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


from facbab272fb KAFKA-9228: Restart tasks on runtime-only connector config 
changes (#16053)
 new 68d92a5b43f KAFKA-16866 Used the right constant in 
RemoteLogManagerTest#testFetchQuotaManagerConfig (#16152)
 new c6f0db3c606 KAFKA-16785 Migrate 
TopicBasedRemoteLogMetadataManagerRestartTest to new test infra (#16170)
 new 2273e061389 MINOR: Fix missing wait topic finished in 
TopicBasedRemoteLogMetadataManagerRestartTest (#16171)
 new f9c37032ffc KAFKA-16859 Cleanup check if tiered storage is enabled 
(#16153)
 new e4a3da6b099 KAFKA-16852 Adding two thread pools kafka-16852 (#16154)
 new 944f4699a7d KAFKA-16880 Update equals and hashcode methods for two 
attributes  (#16173)
 new 69158f67f84 KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to 
ClusterInstance infra (#16180)
 new b6848d699da KAFKA-15776: Introduce remote.fetch.max.timeout.ms to 
configure DelayedRemoteFetch timeout (#14778)
 new 025e791d0cd MINOR: Cleanup the storage module unit tests (#16202)
 new 9460e6b266a KAFKA-16884 Refactor RemoteLogManagerConfig with 
AbstractConfig (#16199)
 new 781b93b00d9 KAFKA-16885 Renamed the enableRemoteStorageSystem to 
isRemoteStorageSystemEnabled (#16256)
 new d94a28b4a4f KAFKA-15776: Support added to update 
remote.fetch.max.wait.ms dynamically (#16203)

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/clients/consumer/ConsumerConfig.java |   5 +-
 .../java/kafka/log/remote/RemoteLogManager.java|   2 +-
 .../kafka/server/builders/KafkaApisBuilder.java|   2 +-
 .../server/builders/ReplicaManagerBuilder.java |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala |   4 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   4 +-
 .../main/scala/kafka/server/ConfigHandler.scala|   2 +-
 .../server/ControllerConfigurationValidator.scala  |   3 +-
 .../scala/kafka/server/DelayedRemoteFetch.scala|   3 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  10 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |   4 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   4 +-
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |   4 +-
 .../kafka/log/remote/RemoteLogManagerTest.java |  15 +-
 .../kafka/server/DelayedRemoteFetchTest.scala  |  21 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  |  26 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |   2 +-
 .../kafka/server/DynamicBrokerConfigTest.scala |  33 ++
 .../unit/kafka/server/ReplicaManagerTest.scala |  14 +-
 .../log/remote/storage/RemoteLogManagerConfig.java | 558 
 .../storage/RemoteLogMetadataFormatterTest.java|  18 +-
 .../storage/RemoteLogMetadataManagerTestUtils.java |  23 +-
 .../storage/RemoteLogMetadataSerdeTest.java|  38 +-
 .../storage/RemoteLogMetadataTransformTest.java|  40 +-
 .../storage/RemoteLogSegmentLifecycleManager.java  |  63 ---
 .../storage/RemoteLogSegmentLifecycleTest.java | 562 -
 ...picBasedRemoteLogMetadataManagerConfigTest.java |  53 +-
 .../TopicBasedRemoteLogMetadataManagerHarness.java |  95 
 ...ogMetadataManagerMultipleSubscriptionsTest.java |  41 +-
 ...icBasedRemoteLogMetadataManagerRestartTest.java | 181 +++
 .../TopicBasedRemoteLogMetadataManagerTest.java|  56 +-
 ...RemoteLogMetadataManagerWrapperWithHarness.java | 105 
 .../remote/storage/RemoteLogManagerConfigTest.java | 107 ++--
 .../storage/RemoteLogMetadataManagerTest.java  |  83 +--
 35 files changed, 812 insertions(+), 1374 deletions(-)
 delete mode 100644 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java
 delete mode 100644 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
 delete mode 100644 
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java



(kafka) 05/12: KAFKA-16852 Adding two thread pools kafka-16852 (#16154)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e4a3da6b09912e9f1d1fce61e7d77978e84e3c8d
Author: Murali Basani 
AuthorDate: Mon Jun 3 10:52:54 2024 +0200

KAFKA-16852 Adding two thread pools kafka-16852 (#16154)

Reviewers: Christo Lolov , Chia-Ping Tasi 

---
 .../log/remote/storage/RemoteLogManagerConfig.java | 38 ++
 .../remote/storage/RemoteLogManagerConfigTest.java |  6 +++-
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 6ea752886a9..d6cf615c781 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -100,6 +100,16 @@ public final class RemoteLogManagerConfig {
 "segments, fetch remote log indexes and clean up remote log 
segments.";
 public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10;
 
+public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP 
= "remote.log.manager.copier.thread.pool.size";
+public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC 
= "Size of the thread pool used in " +
+"scheduling tasks to copy segments.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE 
= 10;
+
+public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = 
"remote.log.manager.expiration.thread.pool.size";
+public static final String 
REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool 
used in" +
+" scheduling tasks to clean up remote log segments.";
+public static final int 
DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10;
+
 public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = 
"remote.log.manager.task.interval.ms";
 public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = 
"Interval at which remote log manager runs the scheduled tasks like copy " +
 "segments, and clean up remote log segments.";
@@ -241,6 +251,18 @@ public final class RemoteLogManagerConfig {
   atLeast(1),
   MEDIUM,
   REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC)
+  
.defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+INT,
+DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE,
+atLeast(1),
+MEDIUM,
+REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC)
+  
.defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+INT,
+DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE,
+atLeast(1),
+MEDIUM,
+REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC)
   .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP,
   LONG,
   DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS,
@@ -333,6 +355,8 @@ public final class RemoteLogManagerConfig {
 private final String remoteLogMetadataManagerClassPath;
 private final long remoteLogIndexFileCacheTotalSizeBytes;
 private final int remoteLogManagerThreadPoolSize;
+private final int remoteLogManagerCopierThreadPoolSize;
+private final int remoteLogManagerExpirationThreadPoolSize;
 private final long remoteLogManagerTaskIntervalMs;
 private final long remoteLogManagerTaskRetryBackoffMs;
 private final long remoteLogManagerTaskRetryBackoffMaxMs;
@@ -361,6 +385,8 @@ public final class RemoteLogManagerConfig {
  config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP),
  config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP),
  config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP),
+ config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP),
+ 
config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP),
  config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP),
  config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP),
  
config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP),
@@ -393,6 +419,8 @@ public final class RemoteLogManagerConfig {
   String remoteLogMetadataManagerListenerName,
   long remoteLogIndexFileCacheTotalSizeBytes,

(kafka) 10/12: KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 9460e6b266a2f2fd3c4569ad3ad461877fe4f3ca
Author: Murali Basani 
AuthorDate: Thu Jun 6 18:06:25 2024 +0200

KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199)

Reviewers: Greg Harris , Kamal Chandraprakash 
, Chia-Ping Tsai 
---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +-
 .../kafka/log/remote/RemoteLogManagerTest.java |   4 +-
 .../unit/kafka/server/ReplicaManagerTest.scala |   8 +-
 .../log/remote/storage/RemoteLogManagerConfig.java | 245 -
 .../remote/storage/RemoteLogManagerConfigTest.java | 190 +---
 5 files changed, 97 insertions(+), 354 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3635d5a3edd..06bce64f623 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -455,7 +455,7 @@ object KafkaConfig {
   }
 
   /** * Remote Log Management Configuration */
-  RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => 
configDef.define(key))
+  RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => 
configDef.define(key))
 
   def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted
   private[server] def defaultValues: Map[String, _] = 
configDef.defaultValues.asScala
@@ -590,7 +590,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val zkEnableSecureAcls: Boolean = 
getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
   val zkMaxInFlightRequests: Int = 
getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
 
-  private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
+  private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props)
   def remoteLogManagerConfig = _remoteLogManagerConfig
 
   private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: 
String): Boolean = {
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 50b581fdf4e..9badaabf419 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
-import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ListenerName;
@@ -2742,8 +2741,7 @@ public class RemoteLogManagerTest {
 props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
 props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
 
-AbstractConfig config = new 
AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
-return new RemoteLogManagerConfig(config);
+return new RemoteLogManagerConfig(props);
 }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 65cbcc7bd70..313b679c605 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.FetchSessionHandler
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, 
TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
+import org.apache.kafka.common.config.{TopicConfig}
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
KafkaStorageException}
 import org.apache.kafka.common.message.LeaderAndIsrRequestData
 import 
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
@@ -4092,8 +4092,7 @@ class ReplicaManagerTest {
 
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
classOf[NoOpRemoteLogMetadataManager].getName)
 // set log reader threads number to 2
 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 
2.toString)
-val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
-val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+val remoteLogManagerConfig = new RemoteLogManagerConfig(props)
 val mockLog = mock(classOf[UnifiedLog])
 val brokerTopicSta

(kafka) 08/12: KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (#14778)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b6848d699da3299b55273b707124f2dbfa552743
Author: Kamal Chandraprakash 
AuthorDate: Wed Jun 5 12:12:23 2024 +0530

KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure 
DelayedRemoteFetch timeout (#14778)

KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure 
DelayedRemoteFetch timeout

Reviewers: Luke Chen 
---
 .../kafka/clients/consumer/ConsumerConfig.java |   5 +-
 .../scala/kafka/server/DelayedRemoteFetch.scala|   3 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |   3 +-
 .../kafka/server/DelayedRemoteFetchTest.scala  |  21 +-
 .../log/remote/storage/RemoteLogManagerConfig.java | 342 +++--
 .../remote/storage/RemoteLogManagerConfigTest.java |   6 +-
 6 files changed, 203 insertions(+), 177 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 7ec147e9e3c..76bfe7e91a1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig {
  * fetch.max.wait.ms
  */
 public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
-private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of 
time the server will block before answering the fetch request if there isn't 
sufficient data to immediately satisfy the requirement given by 
fetch.min.bytes.";
+private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of 
time the server will block before " +
+"answering the fetch request there isn't sufficient data to 
immediately satisfy the requirement given by " +
+"fetch.min.bytes. This config is used only for local log fetch. To 
tune the remote fetch maximum wait " +
+"time, please refer to 'remote.fetch.max.wait.ms' broker config";
 public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500;
 
 /** metadata.max.age.ms */
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala 
b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
index 00d6afb89ff..58a866aa4a6 100644
--- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -35,12 +35,13 @@ import scala.collection._
 class DelayedRemoteFetch(remoteFetchTask: Future[Void],
  remoteFetchResult: 
CompletableFuture[RemoteLogReadResult],
  remoteFetchInfo: RemoteStorageFetchInfo,
+ remoteFetchMaxWaitMs: Long,
  fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
  fetchParams: FetchParams,
  localReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
  replicaManager: ReplicaManager,
  responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit)
-  extends DelayedOperation(fetchParams.maxWaitMs) {
+  extends DelayedOperation(remoteFetchMaxWaitMs) {
 
   if (fetchParams.isFromFollower) {
 throw new IllegalStateException(s"The follower should not invoke remote 
fetch. Fetch params are: $fetchParams")
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 6908ec096d1..17e9a5c1b9e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig,
 return Some(createLogReadResult(e))
 }
 
-val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo,
+val remoteFetchMaxWaitMs = 
config.remoteLogManagerConfig.remoteFetchMaxWaitMs()
+val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, 
remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs,
   fetchPartitionStatus, params, logReadResults, this, responseCallback)
 
 delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
index c35385bcbc8..ea1ffaf0b11 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -40,6 +40,7 @@ class DelayedRemoteFetchTest {
   private val fetchOffset = 500L
   private val logStartOffset = 0L
   private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val

(kafka) 11/12: KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled (#16256)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 781b93b00d99a63dee0ed6650a2d398f7791cea3
Author: Chia Chuan Yu 
AuthorDate: Mon Jun 10 02:14:15 2024 +0800

KAFKA-16885 Renamed the enableRemoteStorageSystem to 
isRemoteStorageSystemEnabled (#16256)

Reviewers: Kamal Chandraprakash , Chia-Ping Tsai 

---
 .../java/kafka/log/remote/RemoteLogManager.java|  2 +-
 .../kafka/server/builders/KafkaApisBuilder.java|  2 +-
 .../server/builders/ReplicaManagerBuilder.java |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala |  4 ++--
 .../src/main/scala/kafka/server/BrokerServer.scala |  4 ++--
 .../main/scala/kafka/server/ConfigHandler.scala|  2 +-
 .../server/ControllerConfigurationValidator.scala  |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  4 ++--
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  4 ++--
 .../kafka/log/remote/RemoteLogManagerTest.java |  2 +-
 .../test/scala/unit/kafka/log/LogConfigTest.scala  | 26 +++---
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala |  6 ++---
 .../log/remote/storage/RemoteLogManagerConfig.java |  2 +-
 14 files changed, 32 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5b0d91ff439..de5fbd85f16 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -378,7 +378,7 @@ public class RemoteLogManager implements Closeable {
Map topicIds) {
 LOGGER.debug("Received leadership changes for leaders: {} and 
followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
 
-if (this.rlmConfig.enableRemoteStorageSystem() && 
!isRemoteLogManagerConfigured()) {
+if (this.rlmConfig.isRemoteStorageSystemEnabled() && 
!isRemoteLogManagerConfigured()) {
 throw new KafkaException("RemoteLogManager is not configured when 
remote storage system is enabled");
 }
 
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 1d422461678..56607723604 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -179,7 +179,7 @@ public class KafkaApisBuilder {
 if (metrics == null) throw new RuntimeException("You must set 
metrics");
 if (quotas == null) throw new RuntimeException("You must set quotas");
 if (fetchManager == null) throw new RuntimeException("You must set 
fetchManager");
-if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
+if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
 if (apiVersionManager == null) throw new RuntimeException("You must 
set apiVersionManager");
 
 return new KafkaApis(requestChannel,
diff --git 
a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 5e8cf2dcdc6..7cac33200d2 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -185,7 +185,7 @@ public class ReplicaManagerBuilder {
 if (metadataCache == null) throw new RuntimeException("You must set 
metadataCache");
 if (logDirFailureChannel == null) throw new RuntimeException("You must 
set logDirFailureChannel");
 if (alterPartitionManager == null) throw new RuntimeException("You 
must set alterIsrManager");
-if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem());
+if (brokerTopicStats == null) brokerTopicStats = new 
BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
 // Initialize metrics in the end just before passing it to 
ReplicaManager to ensure ReplicaManager closes the
 // metrics correctly. There might be a resource leak if it is 
initialized and an exception occurs between
 // its initialization and creation of ReplicaManager.
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 45d6ab7908d..8908aadf459 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -1562,7 +1562,7 @@ object LogManager {
 keepPartitionMetadataFile: Boolean): LogManager = {
 val defaultProps = confi

(kafka-site) 01/01: MINOR: Add Josep Prat Key

2024-06-11 Thread jlprat
This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch MINOR-add-josep-prat-key
in repository https://gitbox.apache.org/repos/asf/kafka-site.git

commit 055a9b80588ce4eb5ae46b460fb687bfefde6ac4
Author: Josep Prat 
AuthorDate: Tue Jun 11 10:13:00 2024 +0200

MINOR: Add Josep Prat Key
---
 KEYS | 34 ++
 1 file changed, 34 insertions(+)

diff --git a/KEYS b/KEYS
index 3961ea8e..60193501 100644
--- a/KEYS
+++ b/KEYS
@@ -2107,3 +2107,37 @@ 
p/ywjgUCZjOVcQIbDAAKCRA5q1Uxp/ywjqFhAQDX/VpF9bAnu2WGoN1pMkHHlacB
 6H9uaArrrUb+g2SNjQD/RxfdZMYnLtDaEJJ5mgADzRvv1Wg3nRBL+xm7mJfswgI=
 =qi+y
 -END PGP PUBLIC KEY BLOCK-
+pub   rsa4096 2022-10-28 [SC]
+  CF9500821E9557AEB04E026C05EEA67F87749E61
+uid   [ultimate] Josep Prat (Apache) 
+sig 305EEA67F87749E61 2022-10-28  [self-signature]
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+
+mQINBGNbjlgBEADLrLoKbXJAe/NvQ47iHWRthis0qsz5ge3EXmheUrrIy5xkSuPM
+AMa10viHMTSPFV8XZhmfIMwlI6EH5eivsTqB1lp08twtFLymVu5ImOC192FyYX5k
+ZWxBE1wbFByDPLAvzx3Dp7ACWF0BBA3JlGi1rKG/QFSghhtlq+I5e7oiTUPDXucV
+6tsXu+inbyu0Il/1CsZD8v0iOchQoaL4pk7oY5jeYJnHT8Soxa+TpG4rh5b9vTWO
+++zB+tDcSnqzvbvw6huHeUDozyfSbbYRcYl3vgmhdCj6tko8HNyj33UHj82KeuQ7
+evKE5GC4rm4jpNosRgdhWxwHPHEupslRxj4EW/fZf72O7VLeuf3srlPSh8Q9AWez
+7Eg4QVhykuM8N/FXyYS8BUaGaInaV1C8l5UEeRVUvd3X74rdnTf3qlPAvXZ8AY96
+NsyMuXs9Edpunr5k38RToi6jl7hR6RTWTUqjYf+kJAGcqP9GDbuM09WE7QzawqPh
+g/gb3tohPLrC77eFQItoODpLLtQ2jfdrsg3o+KeFiWSPh+Wfn0fPWpWdq9/qJ87B
+WS8f/qE36RaAJjwRKtvZBsraUaTTfObdyOK+PcCgicAIZk7IH+T72guF7rZUFvIz
+HaboINtkBqt0NRCMMc1QrVOqmptvP6xhtEqLx+HP3bmCqfpXXj0QdsJjmQARAQAB
+tCdKb3NlcCBQcmF0IChBcGFjaGUpIDxqbHByYXRAYXBhY2hlLm9yZz6JAkYEEwEK
+ADAWIQTPlQCCHpVXrrBOAmwF7qZ/h3SeYQUCY1uOWAIbAwILCQMVCggCFgACHgUC
+F4AACgkQBe6mf4d0nmFJdA//fMqUIQXvVYxMpwEoZHfKvIgy+mrS2z1fXqsD795l
+E0FDOk3OOhqh3bsb8/ietM0fdk8dewk0MC9+uDVb06zFKY3ex1ZhcNybmOuF75Ey
+1AErWV78dKG/f8h8EnmMeweYLel6jL1Ch32hsQTci9RcL0FSsdxWfOjFNNSRrRpU
+dXUJH8GXAP0BUGfS/AkfBCpWDWXVRdNb83q+DqXZd878O9ybaqPY1LzSiKM4TfST
+4RptJVVrKPcPQLz9cT8aWW7AbF3HS5I3zu1bF62+Mk+mq3zL/vEWXbXfRibeLUB+
+VZFpKRSCkKSYCkOLCGFclPMoBmD3PVf7iqIJ3gmRTktrfve/SrCJmnfi3325ZQuF
+i5wMh3jRRxchXEOwDX5oXOABW3MoZfU594rvdpWQikeXMsAS9O8bZ4HJsnIOaOJ2
+1TU5M0s0uuoRU0kd5lx5SI0WeTa2SrEwy35DNhA7m1dw/nA74VhITTbHXg7xg4Iz
+bivsVVjT/CZPql7I78jL9KJq80KVdhOcDNPwgoCj4gNVVFllbuJAOcMEs7HVdV3Q
+6DmT9N0HDfHkWt6dhpKmog14MeM0X5ZaALM9JE9xlyt4laBeRW1YXER/yLw3Yo6h
+HLtR1XUniZRa2xjP82Xl6/ZfTeeYpB1Ewn7B8xDTfTnxQusYfX3Rr07lT145l/cM
+q+k=
+=LiMJ
+-END PGP PUBLIC KEY BLOCK-



(kafka-site) branch MINOR-add-josep-prat-key created (now 055a9b80)

2024-06-11 Thread jlprat
This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a change to branch MINOR-add-josep-prat-key
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


  at 055a9b80 MINOR: Add Josep Prat Key

This branch includes the following new commits:

 new 055a9b80 MINOR: Add Josep Prat Key

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.




(kafka-site) branch asf-site updated: MINOR: Add Josep Prat Key (#603)

2024-06-11 Thread jlprat
This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new 9e243531 MINOR: Add Josep Prat Key (#603)
9e243531 is described below

commit 9e243531dac508033d149e724a47afd1732629e9
Author: Josep Prat 
AuthorDate: Tue Jun 11 10:58:34 2024 +0200

MINOR: Add Josep Prat Key (#603)

Reviewers: Igor Soarez 
---
 KEYS | 34 ++
 1 file changed, 34 insertions(+)

diff --git a/KEYS b/KEYS
index 3961ea8e..60193501 100644
--- a/KEYS
+++ b/KEYS
@@ -2107,3 +2107,37 @@ 
p/ywjgUCZjOVcQIbDAAKCRA5q1Uxp/ywjqFhAQDX/VpF9bAnu2WGoN1pMkHHlacB
 6H9uaArrrUb+g2SNjQD/RxfdZMYnLtDaEJJ5mgADzRvv1Wg3nRBL+xm7mJfswgI=
 =qi+y
 -END PGP PUBLIC KEY BLOCK-
+pub   rsa4096 2022-10-28 [SC]
+  CF9500821E9557AEB04E026C05EEA67F87749E61
+uid   [ultimate] Josep Prat (Apache) 
+sig 305EEA67F87749E61 2022-10-28  [self-signature]
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+
+mQINBGNbjlgBEADLrLoKbXJAe/NvQ47iHWRthis0qsz5ge3EXmheUrrIy5xkSuPM
+AMa10viHMTSPFV8XZhmfIMwlI6EH5eivsTqB1lp08twtFLymVu5ImOC192FyYX5k
+ZWxBE1wbFByDPLAvzx3Dp7ACWF0BBA3JlGi1rKG/QFSghhtlq+I5e7oiTUPDXucV
+6tsXu+inbyu0Il/1CsZD8v0iOchQoaL4pk7oY5jeYJnHT8Soxa+TpG4rh5b9vTWO
+++zB+tDcSnqzvbvw6huHeUDozyfSbbYRcYl3vgmhdCj6tko8HNyj33UHj82KeuQ7
+evKE5GC4rm4jpNosRgdhWxwHPHEupslRxj4EW/fZf72O7VLeuf3srlPSh8Q9AWez
+7Eg4QVhykuM8N/FXyYS8BUaGaInaV1C8l5UEeRVUvd3X74rdnTf3qlPAvXZ8AY96
+NsyMuXs9Edpunr5k38RToi6jl7hR6RTWTUqjYf+kJAGcqP9GDbuM09WE7QzawqPh
+g/gb3tohPLrC77eFQItoODpLLtQ2jfdrsg3o+KeFiWSPh+Wfn0fPWpWdq9/qJ87B
+WS8f/qE36RaAJjwRKtvZBsraUaTTfObdyOK+PcCgicAIZk7IH+T72guF7rZUFvIz
+HaboINtkBqt0NRCMMc1QrVOqmptvP6xhtEqLx+HP3bmCqfpXXj0QdsJjmQARAQAB
+tCdKb3NlcCBQcmF0IChBcGFjaGUpIDxqbHByYXRAYXBhY2hlLm9yZz6JAkYEEwEK
+ADAWIQTPlQCCHpVXrrBOAmwF7qZ/h3SeYQUCY1uOWAIbAwILCQMVCggCFgACHgUC
+F4AACgkQBe6mf4d0nmFJdA//fMqUIQXvVYxMpwEoZHfKvIgy+mrS2z1fXqsD795l
+E0FDOk3OOhqh3bsb8/ietM0fdk8dewk0MC9+uDVb06zFKY3ex1ZhcNybmOuF75Ey
+1AErWV78dKG/f8h8EnmMeweYLel6jL1Ch32hsQTci9RcL0FSsdxWfOjFNNSRrRpU
+dXUJH8GXAP0BUGfS/AkfBCpWDWXVRdNb83q+DqXZd878O9ybaqPY1LzSiKM4TfST
+4RptJVVrKPcPQLz9cT8aWW7AbF3HS5I3zu1bF62+Mk+mq3zL/vEWXbXfRibeLUB+
+VZFpKRSCkKSYCkOLCGFclPMoBmD3PVf7iqIJ3gmRTktrfve/SrCJmnfi3325ZQuF
+i5wMh3jRRxchXEOwDX5oXOABW3MoZfU594rvdpWQikeXMsAS9O8bZ4HJsnIOaOJ2
+1TU5M0s0uuoRU0kd5lx5SI0WeTa2SrEwy35DNhA7m1dw/nA74VhITTbHXg7xg4Iz
+bivsVVjT/CZPql7I78jL9KJq80KVdhOcDNPwgoCj4gNVVFllbuJAOcMEs7HVdV3Q
+6DmT9N0HDfHkWt6dhpKmog14MeM0X5ZaALM9JE9xlyt4laBeRW1YXER/yLw3Yo6h
+HLtR1XUniZRa2xjP82Xl6/ZfTeeYpB1Ewn7B8xDTfTnxQusYfX3Rr07lT145l/cM
+q+k=
+=LiMJ
+-END PGP PUBLIC KEY BLOCK-



(kafka-site) branch MINOR-add-josep-prat-key deleted (was 055a9b80)

2024-06-11 Thread jlprat
This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a change to branch MINOR-add-josep-prat-key
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


 was 055a9b80 MINOR: Add Josep Prat Key

The revisions that were on this branch are still contained in
other references; therefore, this change does not discard any commits
from the repository.



svn commit: r69659 - /release/kafka/KEYS

2024-06-11 Thread mimaison
Author: mimaison
Date: Tue Jun 11 09:10:53 2024
New Revision: 69659

Log:
Add Josep Prat's key

Modified:
release/kafka/KEYS

Modified: release/kafka/KEYS
==
--- release/kafka/KEYS (original)
+++ release/kafka/KEYS Tue Jun 11 09:10:53 2024
@@ -2107,3 +2107,37 @@ p/ywjgUCZjOVcQIbDAAKCRA5q1Uxp/ywjqFhAQDX
 6H9uaArrrUb+g2SNjQD/RxfdZMYnLtDaEJJ5mgADzRvv1Wg3nRBL+xm7mJfswgI=
 =qi+y
 -END PGP PUBLIC KEY BLOCK-
+pub   rsa4096 2022-10-28 [SC]
+  CF9500821E9557AEB04E026C05EEA67F87749E61
+uid   [ultimate] Josep Prat (Apache) 
+sig 305EEA67F87749E61 2022-10-28  [self-signature]
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+
+mQINBGNbjlgBEADLrLoKbXJAe/NvQ47iHWRthis0qsz5ge3EXmheUrrIy5xkSuPM
+AMa10viHMTSPFV8XZhmfIMwlI6EH5eivsTqB1lp08twtFLymVu5ImOC192FyYX5k
+ZWxBE1wbFByDPLAvzx3Dp7ACWF0BBA3JlGi1rKG/QFSghhtlq+I5e7oiTUPDXucV
+6tsXu+inbyu0Il/1CsZD8v0iOchQoaL4pk7oY5jeYJnHT8Soxa+TpG4rh5b9vTWO
+++zB+tDcSnqzvbvw6huHeUDozyfSbbYRcYl3vgmhdCj6tko8HNyj33UHj82KeuQ7
+evKE5GC4rm4jpNosRgdhWxwHPHEupslRxj4EW/fZf72O7VLeuf3srlPSh8Q9AWez
+7Eg4QVhykuM8N/FXyYS8BUaGaInaV1C8l5UEeRVUvd3X74rdnTf3qlPAvXZ8AY96
+NsyMuXs9Edpunr5k38RToi6jl7hR6RTWTUqjYf+kJAGcqP9GDbuM09WE7QzawqPh
+g/gb3tohPLrC77eFQItoODpLLtQ2jfdrsg3o+KeFiWSPh+Wfn0fPWpWdq9/qJ87B
+WS8f/qE36RaAJjwRKtvZBsraUaTTfObdyOK+PcCgicAIZk7IH+T72guF7rZUFvIz
+HaboINtkBqt0NRCMMc1QrVOqmptvP6xhtEqLx+HP3bmCqfpXXj0QdsJjmQARAQAB
+tCdKb3NlcCBQcmF0IChBcGFjaGUpIDxqbHByYXRAYXBhY2hlLm9yZz6JAkYEEwEK
+ADAWIQTPlQCCHpVXrrBOAmwF7qZ/h3SeYQUCY1uOWAIbAwILCQMVCggCFgACHgUC
+F4AACgkQBe6mf4d0nmFJdA//fMqUIQXvVYxMpwEoZHfKvIgy+mrS2z1fXqsD795l
+E0FDOk3OOhqh3bsb8/ietM0fdk8dewk0MC9+uDVb06zFKY3ex1ZhcNybmOuF75Ey
+1AErWV78dKG/f8h8EnmMeweYLel6jL1Ch32hsQTci9RcL0FSsdxWfOjFNNSRrRpU
+dXUJH8GXAP0BUGfS/AkfBCpWDWXVRdNb83q+DqXZd878O9ybaqPY1LzSiKM4TfST
+4RptJVVrKPcPQLz9cT8aWW7AbF3HS5I3zu1bF62+Mk+mq3zL/vEWXbXfRibeLUB+
+VZFpKRSCkKSYCkOLCGFclPMoBmD3PVf7iqIJ3gmRTktrfve/SrCJmnfi3325ZQuF
+i5wMh3jRRxchXEOwDX5oXOABW3MoZfU594rvdpWQikeXMsAS9O8bZ4HJsnIOaOJ2
+1TU5M0s0uuoRU0kd5lx5SI0WeTa2SrEwy35DNhA7m1dw/nA74VhITTbHXg7xg4Iz
+bivsVVjT/CZPql7I78jL9KJq80KVdhOcDNPwgoCj4gNVVFllbuJAOcMEs7HVdV3Q
+6DmT9N0HDfHkWt6dhpKmog14MeM0X5ZaALM9JE9xlyt4laBeRW1YXER/yLw3Yo6h
+HLtR1XUniZRa2xjP82Xl6/ZfTeeYpB1Ewn7B8xDTfTnxQusYfX3Rr07lT145l/cM
+q+k=
+=LiMJ
+-END PGP PUBLIC KEY BLOCK-




(kafka) branch trunk updated (af86e56fcdf -> 99eacf1b61a)

2024-06-11 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from af86e56fcdf KAFKA-10787 Apply spotless to tools and tools-api module 
(#16262)
 add 99eacf1b61a KAFKA-16914: Added share group dynamic and broker configs 
(#16268)

No new revisions were added by this update.

Summary of changes:
 core/src/main/scala/kafka/server/KafkaConfig.scala | 65 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 75 -
 .../kafka/server/config/ShareGroupConfigs.java | 78 ++
 3 files changed, 216 insertions(+), 2 deletions(-)
 create mode 100644 
server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java



(kafka) branch trunk updated: KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267)

2024-06-11 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new f746d67c3bf KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images 
static assets (#16267)
f746d67c3bf is described below

commit f746d67c3bf62e7d4f5f4e652ea36b4d5a0e01a6
Author: KrishVora01 <156789009+krishvor...@users.noreply.github.com>
AuthorDate: Tue Jun 11 16:20:47 2024 +0530

KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets 
(#16267)

This PR aims to add the static Dockerfile and scripts for AK 3.7.0 version. 
As mentioned in KIP-1028 this PR aims to start the release of the kafka:3.7.0 
Docker Official image. This will also help us validate the process and allow us 
to address any changes suggested by Dockerhub before the 3.8.0 release.

The static Dockerfile and scripts have been generated via the github 
actions workflows and scripts added as part of 
https://github.com/apache/kafka/pull/16027. The reports of build and testing 
the 3.7.0 Docker official image are below.

Reviewers: Manikumar Reddy , Vedarth Sharma 

---
 docker/docker_official_images/3.7.0/jvm/Dockerfile |  95 
 docker/docker_official_images/3.7.0/jvm/jsa_launch |  49 +
 docker/docker_official_images/3.7.0/jvm/launch |  68 
 .../3.7.0/jvm/resources/common-scripts/bash-config |  23 
 .../3.7.0/jvm/resources/common-scripts/configure   | 121 +
 .../jvm/resources/common-scripts/configureDefaults |  28 +
 .../3.7.0/jvm/resources/common-scripts/run |  38 +++
 7 files changed, 422 insertions(+)

diff --git a/docker/docker_official_images/3.7.0/jvm/Dockerfile 
b/docker/docker_official_images/3.7.0/jvm/Dockerfile
new file mode 100755
index 000..7aa054a4f7b
--- /dev/null
+++ b/docker/docker_official_images/3.7.0/jvm/Dockerfile
@@ -0,0 +1,95 @@
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###
+
+FROM eclipse-temurin:21-jre-alpine AS build-jsa
+
+USER root
+
+# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, 
for version 3.7.0
+ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
+
+COPY jsa_launch /etc/kafka/docker/jsa_launch
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache wget gcompat gpg gpg-agent procps bash; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
+wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
+gpg --import KEYS; \
+gpg --batch --verify kafka.tgz.asc kafka.tgz
+
+# Generate jsa files using dynamic CDS for kafka server start command and 
kafka storage format command
+RUN /etc/kafka/docker/jsa_launch
+
+
+FROM eclipse-temurin:21-jre-alpine
+
+# exposed ports
+EXPOSE 9092
+
+USER root
+
+# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, 
for version 3.7.0
+ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
+ENV build_date 2024-06-10
+
+
+LABEL org.label-schema.name="kafka" \
+  org.label-schema.description="Apache Kafka" \
+  org.label-schema.build-date="${build_date}" \
+  org.label-schema.vcs-url="https://github.com/apache/kafka"; \
+  maintainer="Apache Kafka"
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache wget gcompat gpg gpg-agent procps bash; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
+wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
+gpg --import KEYS; \
+gpg --batch --verify kafka.tgz.asc kafka.tgz; \
+mkdir -p /var/lib/kafka/data /etc/kafka/secrets; \
+mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \
+addus

(kafka) branch trunk updated (f746d67c3bf -> b7dcae44ffb)

2024-06-11 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from f746d67c3bf KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images 
static assets (#16267)
 add b7dcae44ffb KAFKA-16373: KIP-1028: Modfiying download url for kafka 
dockerfile (#16281)

No new revisions were added by this update.

Summary of changes:
 docker/docker_official_image_build_test.py |  2 +-
 docker/docker_official_images/3.7.0/jvm/Dockerfile | 10 +-
 docker/prepare_docker_official_image_source.py |  4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)



(kafka) branch 3.8 updated (d94a28b4a4f -> 15db8233179)

2024-06-11 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a change to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


from d94a28b4a4f KAFKA-15776: Support added to update 
remote.fetch.max.wait.ms dynamically (#16203)
 new e75cc45bdf0 KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images 
static assets (#16267)
 new 15db8233179 KAFKA-16373: KIP-1028: Modfiying download url for kafka 
dockerfile (#16281)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docker/docker_official_image_build_test.py |  2 +-
 docker/{ => docker_official_images/3.7.0}/jvm/Dockerfile   | 10 +-
 docker/{ => docker_official_images/3.7.0}/jvm/jsa_launch   |  0
 docker/{ => docker_official_images/3.7.0}/jvm/launch   |  0
 .../3.7.0/jvm}/resources/common-scripts/bash-config|  0
 .../3.7.0/jvm}/resources/common-scripts/configure  |  0
 .../3.7.0/jvm}/resources/common-scripts/configureDefaults  |  0
 .../3.7.0/jvm}/resources/common-scripts/run|  0
 docker/prepare_docker_official_image_source.py |  4 ++--
 9 files changed, 8 insertions(+), 8 deletions(-)
 copy docker/{ => docker_official_images/3.7.0}/jvm/Dockerfile (90%)
 mode change 100644 => 100755
 copy docker/{ => docker_official_images/3.7.0}/jvm/jsa_launch (100%)
 copy docker/{ => docker_official_images/3.7.0}/jvm/launch (100%)
 copy docker/{ => 
docker_official_images/3.7.0/jvm}/resources/common-scripts/bash-config (100%)
 mode change 100644 => 100755
 copy docker/{ => 
docker_official_images/3.7.0/jvm}/resources/common-scripts/configure (100%)
 copy docker/{ => 
docker_official_images/3.7.0/jvm}/resources/common-scripts/configureDefaults 
(100%)
 copy docker/{ => 
docker_official_images/3.7.0/jvm}/resources/common-scripts/run (100%)



(kafka) 02/02: KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281)

2024-06-11 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 15db8233179f0889def4d4f60df199fbde4dd88f
Author: KrishVora01 <156789009+krishvor...@users.noreply.github.com>
AuthorDate: Tue Jun 11 21:02:33 2024 +0530

KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281)

This PR modifies the download url from https://downloads.apache.org/kafka/ 
to https://archive.apache.org/dist/kafka/ as the former is not permanent.

Reviewers: Manikumar Reddy , Vedarth Sharma 

---
 docker/docker_official_image_build_test.py |  2 +-
 docker/docker_official_images/3.7.0/jvm/Dockerfile | 10 +-
 docker/prepare_docker_official_image_source.py |  4 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/docker/docker_official_image_build_test.py 
b/docker/docker_official_image_build_test.py
index 6ffe25ee0b8..3da68854c23 100644
--- a/docker/docker_official_image_build_test.py
+++ b/docker/docker_official_image_build_test.py
@@ -76,7 +76,7 @@ if __name__ == '__main__':
 parser.add_argument("--test", "-t", action="store_true", dest="test_only",
 default=False, help="Only run the tests, don't build 
the image")
 args = parser.parse_args()
-kafka_url = 
f"https://downloads.apache.org/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz";
+kafka_url = 
f"https://archive.apache.org/dist/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz";
 if args.build_only or not (args.build_only or args.test_only):
 if args.kafka_version:
 build_docker_official_image(args.image, args.tag, 
args.kafka_version, args.image_type)
diff --git a/docker/docker_official_images/3.7.0/jvm/Dockerfile 
b/docker/docker_official_images/3.7.0/jvm/Dockerfile
index 7aa054a4f7b..905e2f2149b 100755
--- a/docker/docker_official_images/3.7.0/jvm/Dockerfile
+++ b/docker/docker_official_images/3.7.0/jvm/Dockerfile
@@ -20,8 +20,8 @@ FROM eclipse-temurin:21-jre-alpine AS build-jsa
 
 USER root
 
-# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, 
for version 3.7.0
-ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
+# Get Kafka from https://archive.apache.org/dist/kafka, url passed as env var, 
for version 3.7.0
+ENV kafka_url https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
 
 COPY jsa_launch /etc/kafka/docker/jsa_launch
 
@@ -48,9 +48,9 @@ EXPOSE 9092
 
 USER root
 
-# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, 
for version 3.7.0
-ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
-ENV build_date 2024-06-10
+# Get Kafka from https://archive.apache.org/dist/kafka, url passed as env var, 
for version 3.7.0
+ENV kafka_url https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz
+ENV build_date 2024-06-11
 
 
 LABEL org.label-schema.name="kafka" \
diff --git a/docker/prepare_docker_official_image_source.py 
b/docker/prepare_docker_official_image_source.py
index 10b4f04d78f..25d57c53e0f 100644
--- a/docker/prepare_docker_official_image_source.py
+++ b/docker/prepare_docker_official_image_source.py
@@ -46,7 +46,7 @@ def remove_args_and_hardcode_values(file_path, kafka_version, 
kafka_url):
 filedata = filedata.replace(
 "ARG build_date", f"ENV build_date {str(date.today())}")
 original_comment = re.compile(r"# Get kafka from 
https://archive.apache.org/dist/kafka and pass the url through build arguments")
-updated_comment = f"# Get Kafka from https://downloads.apache.org/kafka, 
url passed as env var, for version {kafka_version}"
+updated_comment = f"# Get Kafka from 
https://archive.apache.org/dist/kafka, url passed as env var, for version 
{kafka_version}"
 filedata = original_comment.sub(updated_comment, filedata)
 with open(file_path, 'w') as file:
 file.write(filedata)
@@ -59,7 +59,7 @@ if __name__ == '__main__':
 parser.add_argument("--kafka-version", "-v", dest="kafka_version",
 help="Kafka version for which the source for docker 
official image is to be built")
 args = parser.parse_args()
-kafka_url = 
f"https://downloads.apache.org/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz";
+kafka_url = 
f"https://archive.apache.org/dist/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz";
 current_dir = os.path.dirname(os.path.realpath(__file__))
 new_dir = os.path.join(
 current_dir, f'docker_official_images', args.kafka_version)



(kafka) 01/02: KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267)

2024-06-11 Thread manikumar
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e75cc45bdf0dcd385e011e4ff7c5e8ce62fb3abf
Author: KrishVora01 <156789009+krishvor...@users.noreply.github.com>
AuthorDate: Tue Jun 11 16:20:47 2024 +0530

KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets 
(#16267)

This PR aims to add the static Dockerfile and scripts for AK 3.7.0 version. 
As mentioned in KIP-1028 this PR aims to start the release of the kafka:3.7.0 
Docker Official image. This will also help us validate the process and allow us 
to address any changes suggested by Dockerhub before the 3.8.0 release.

The static Dockerfile and scripts have been generated via the github 
actions workflows and scripts added as part of 
https://github.com/apache/kafka/pull/16027. The reports of build and testing 
the 3.7.0 Docker official image are below.

Reviewers: Manikumar Reddy , Vedarth Sharma 

---
 docker/docker_official_images/3.7.0/jvm/Dockerfile |  95 
 docker/docker_official_images/3.7.0/jvm/jsa_launch |  49 +
 docker/docker_official_images/3.7.0/jvm/launch |  68 
 .../3.7.0/jvm/resources/common-scripts/bash-config |  23 
 .../3.7.0/jvm/resources/common-scripts/configure   | 121 +
 .../jvm/resources/common-scripts/configureDefaults |  28 +
 .../3.7.0/jvm/resources/common-scripts/run |  38 +++
 7 files changed, 422 insertions(+)

diff --git a/docker/docker_official_images/3.7.0/jvm/Dockerfile 
b/docker/docker_official_images/3.7.0/jvm/Dockerfile
new file mode 100755
index 000..7aa054a4f7b
--- /dev/null
+++ b/docker/docker_official_images/3.7.0/jvm/Dockerfile
@@ -0,0 +1,95 @@
+###
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###
+
+FROM eclipse-temurin:21-jre-alpine AS build-jsa
+
+USER root
+
+# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, 
for version 3.7.0
+ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
+
+COPY jsa_launch /etc/kafka/docker/jsa_launch
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache wget gcompat gpg gpg-agent procps bash; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
+wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
+gpg --import KEYS; \
+gpg --batch --verify kafka.tgz.asc kafka.tgz
+
+# Generate jsa files using dynamic CDS for kafka server start command and 
kafka storage format command
+RUN /etc/kafka/docker/jsa_launch
+
+
+FROM eclipse-temurin:21-jre-alpine
+
+# exposed ports
+EXPOSE 9092
+
+USER root
+
+# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, 
for version 3.7.0
+ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
+ENV build_date 2024-06-10
+
+
+LABEL org.label-schema.name="kafka" \
+  org.label-schema.description="Apache Kafka" \
+  org.label-schema.build-date="${build_date}" \
+  org.label-schema.vcs-url="https://github.com/apache/kafka"; \
+  maintainer="Apache Kafka"
+
+RUN set -eux ; \
+apk update ; \
+apk upgrade ; \
+apk add --no-cache wget gcompat gpg gpg-agent procps bash; \
+mkdir opt/kafka; \
+wget -nv -O kafka.tgz "$kafka_url"; \
+wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
+tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
+wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
+gpg --import KEYS; \
+gpg --batch --verify kafka.tgz.asc kafka.tgz; \
+mkdir -p /var/lib/kafka/data /etc/kafka/secrets; \
+mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \
+adduser -h /home/appuser -D --shell /bin/bash appuser; \
+chown appuser:appuser -R /usr/logs /opt/kafka /mnt/shared/config; \
+chown appuser:root -R /var/lib/kafka /etc/kafka/secrets /etc/kafka; \
+chm

(kafka) branch trunk updated (b7dcae44ffb -> f3dbd7ed08a)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from b7dcae44ffb KAFKA-16373: KIP-1028: Modfiying download url for kafka 
dockerfile (#16281)
 add f3dbd7ed08a KAFKA-16904: Metric to measure the latency of remote read 
requests (#16209)

No new revisions were added by this update.

Summary of changes:
 core/src/main/java/kafka/log/remote/RemoteLogManager.java   | 13 ++---
 core/src/main/java/kafka/log/remote/RemoteLogReader.java| 11 ++-
 .../test/java/kafka/log/remote/RemoteLogManagerTest.java|  9 +++--
 .../src/test/java/kafka/log/remote/RemoteLogReaderTest.java | 12 +---
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala   | 10 +-
 .../org/apache/kafka/server/metrics/KafkaMetricsGroup.java  | 12 
 .../server/log/remote/storage/RemoteStorageMetrics.java |  3 +++
 7 files changed, 56 insertions(+), 14 deletions(-)



(kafka) branch 3.8 updated: KAFKA-16904: Metric to measure the latency of remote read requests (#16209)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new bcd95f6485c KAFKA-16904: Metric to measure the latency of remote read 
requests (#16209)
bcd95f6485c is described below

commit bcd95f6485cd94fc489b0c1eba976ecda517d086
Author: Kamal Chandraprakash 
AuthorDate: Tue Jun 11 21:07:12 2024 +0530

KAFKA-16904: Metric to measure the latency of remote read requests (#16209)

Reviewers: Satish Duggana , Christo Lolov 
, Luke Chen 
---
 core/src/main/java/kafka/log/remote/RemoteLogManager.java   | 13 ++---
 core/src/main/java/kafka/log/remote/RemoteLogReader.java| 11 ++-
 .../test/java/kafka/log/remote/RemoteLogManagerTest.java|  9 +++--
 .../src/test/java/kafka/log/remote/RemoteLogReaderTest.java | 12 +---
 .../test/scala/unit/kafka/server/ReplicaManagerTest.scala   | 10 +-
 .../org/apache/kafka/server/metrics/KafkaMetricsGroup.java  | 12 
 .../server/log/remote/storage/RemoteStorageMetrics.java |  3 +++
 7 files changed, 56 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index de5fbd85f16..137985d00b5 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -17,6 +17,7 @@
 package kafka.log.remote;
 
 import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Timer;
 import kafka.cluster.EndPoint;
 import kafka.cluster.Partition;
 import kafka.log.UnifiedLog;
@@ -126,6 +127,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
 import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
 
 /**
@@ -174,6 +176,7 @@ public class RemoteLogManager implements Closeable {
 private boolean closed = false;
 
 private volatile boolean remoteLogManagerConfigured = false;
+private final Timer remoteReadTimer;
 
 /**
  * Creates RemoteLogManager instance with the given arguments.
@@ -216,12 +219,14 @@ public class RemoteLogManager implements Closeable {
 delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
 rlmScheduledThreadPool = new 
RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
 
-
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(),
 new Gauge() {
+
metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new 
Gauge() {
 @Override
 public Double value() {
 return rlmScheduledThreadPool.getIdlePercent();
 }
 });
+remoteReadTimer = 
metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC,
+TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
 
 remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
 REMOTE_LOG_READER_THREAD_NAME_PREFIX,
@@ -235,7 +240,8 @@ public class RemoteLogManager implements Closeable {
 }
 
 private void removeMetrics() {
-
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName());
+
metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC);
+
metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC);
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
@@ -1664,7 +1670,8 @@ public class RemoteLogManager implements Closeable {
  * @throws java.util.concurrent.RejectedExecutionException if the task 
cannot be accepted for execution (task queue is full)
  */
 public Future asyncRead(RemoteStorageFetchInfo fetchInfo, 
Consumer callback) {
-return remoteStorageReaderThreadPool.submit(new 
RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, 
rlmFetchQuotaManager));
+return remoteStorageReaderThreadPool.submit(
+new RemoteLogReader(fetchInfo, this, callback, 
brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer));
 }
 
 void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java 
b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
index 9395cbd60ed..c28677459ef 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -16,6 +16,7 @@
  */
 package kafka.log.remote;
 
+import com.y

(kafka) branch trunk updated: KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)

2024-06-11 Thread cmccabe
This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new aecaf444756 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
aecaf444756 is described below

commit aecaf4447561edd8da9f06e3abdf46f382dc9d89
Author: Nikolay 
AuthorDate: Tue Jun 11 20:01:35 2024 +0300

KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)

Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum 
request and response.
Also add support to AdminClient.describeQuorum, so that users will be able 
to find the current set of
quorum nodes, as well as their directories, via these RPCs.

Reviewers: Luke Chen , Colin P. McCabe 
, Andrew Schofield 
---
 .../kafka/clients/admin/KafkaAdminClient.java  |  20 +-
 .../org/apache/kafka/clients/admin/QuorumInfo.java |  76 ++-
 .../common/requests/DescribeQuorumRequest.java |   4 +-
 .../common/requests/DescribeQuorumResponse.java|  13 +-
 .../common/message/DescribeQuorumRequest.json  |   4 +-
 .../common/message/DescribeQuorumResponse.json |  24 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  23 +-
 .../unit/kafka/server/ApiVersionsRequestTest.scala |   4 +-
 .../kafka/server/DescribeQuorumRequestTest.scala   |   2 +
 .../scala/unit/kafka/server/KafkaApisTest.scala|   3 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java |  10 +-
 .../apache/kafka/raft/KafkaRaftClientDriver.java   |   1 +
 .../java/org/apache/kafka/raft/LeaderState.java|  84 +--
 .../java/org/apache/kafka/raft/QuorumState.java|   3 +-
 .../java/org/apache/kafka/raft/RaftMessage.java|   3 +-
 .../java/org/apache/kafka/raft/RaftRequest.java|  17 +-
 .../java/org/apache/kafka/raft/RaftResponse.java   |   5 +
 .../kafka/raft/internals/BlockingMessageQueue.java |   5 +
 .../org/apache/kafka/raft/internals/VoterSet.java  |   6 +-
 .../apache/kafka/raft/KafkaNetworkChannelTest.java |   1 +
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |   7 +
 .../org/apache/kafka/raft/LeaderStateTest.java | 243 -
 .../apache/kafka/raft/RaftClientTestContext.java   |  19 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |   2 +-
 .../kafka/raft/internals/KafkaRaftMetricsTest.java |   4 +-
 25 files changed, 418 insertions(+), 165 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c59cccf67c4..d7d525e4431 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4414,12 +4414,13 @@ public class KafkaAdminClient extends AdminClient {
 private QuorumInfo.ReplicaState 
translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) {
 return new QuorumInfo.ReplicaState(
 replica.replicaId(),
+replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID 
: replica.replicaDirectoryId(),
 replica.logEndOffset(),
 replica.lastFetchTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()),
 replica.lastCaughtUpTimestamp() == -1 ? 
OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp()));
 }
 
-private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition) {
+private QuorumInfo createQuorumResult(final 
DescribeQuorumResponseData.PartitionData partition, 
DescribeQuorumResponseData.NodeCollection nodeCollection) {
 List voters = 
partition.currentVoters().stream()
 .map(this::translateReplicaState)
 .collect(Collectors.toList());
@@ -4428,12 +4429,21 @@ public class KafkaAdminClient extends AdminClient {
 .map(this::translateReplicaState)
 .collect(Collectors.toList());
 
+Map nodes = 
nodeCollection.stream().map(n -> {
+List endpoints = n.listeners().stream()
+.map(l -> new RaftVoterEndpoint(l.name(), l.host(), 
l.port()))
+.collect(Collectors.toList());
+
+return new QuorumInfo.Node(n.nodeId(), endpoints);
+}).collect(Collectors.toMap(QuorumInfo.Node::nodeId, 
Function.identity()));
+
 return new QuorumInfo(
 partition.leaderId(),
 partition.leaderEpoch(),
 partition.highWatermark(),
 voters,
-observers
+observers,
+nodes
 );
 }

(kafka) branch trunk updated (aecaf444756 -> 98f7da9172c)

2024-06-11 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from aecaf444756 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
 add 98f7da9172c KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws 
NPE when one member has no subscriptions (#16283)

No new revisions were added by this update.

Summary of changes:
 .../UniformHeterogeneousAssignmentBuilder.java |  8 ++--
 .../UniformHeterogeneousAssignmentBuilderTest.java | 45 ++
 2 files changed, 49 insertions(+), 4 deletions(-)



(kafka) branch 3.8 updated: KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one member has no subscriptions (#16283)

2024-06-11 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 46d7e44d1b6 KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws 
NPE when one member has no subscriptions (#16283)
46d7e44d1b6 is described below

commit 46d7e44d1b6ed7807e5ec692f397a3d4118155b5
Author: David Jacot 
AuthorDate: Tue Jun 11 20:43:56 2024 +0200

KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one 
member has no subscriptions (#16283)

Fix the following NPE:

```
java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()"
 because the return value of "java.util.Map.get(Object)" is null
at 
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248)
at 
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336)
at 
org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157)
at 
org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84)
at 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302)
at 
org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913)
at 
org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518)
at 
org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254)
at 
org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308)
at 
org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298)
at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769)
at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582)
at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96)
at 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767)
at 
org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144)
at 
org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176)
```

Reviewers: Lianet Magrans , Justine Olshan 

---
 .../UniformHeterogeneousAssignmentBuilder.java |  8 ++--
 .../UniformHeterogeneousAssignmentBuilderTest.java | 45 ++
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
index 682dbbd677d..a5e63d87805 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java
@@ -119,7 +119,7 @@ public class UniformHeterogeneousAssignmentBuilder {
 this.subscribedTopicIds = new HashSet<>();
 this.membersPerTopic = new HashMap<>();
 this.targetAssignment = new HashMap<>();
-groupSpec.memberIds().forEach(memberId ->
+groupSpec.memberIds().forEach(memberId -> {
 
groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> {
 // Check if the subscribed topic exists.
 int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
@@ -130,9 +130,9 @@ public class UniformHeterogeneousAssignmentBuilder {
 }
 subscribedTopicIds.add(topicId);
 membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
-targetAssignment.put(memberId, new MemberAssignmentImpl(new 
HashMap<>()));
-})
-);
+});
+targetAssignment.put(memberId, new MemberAssignmentImpl(new 
HashMap<>()));
+});
 this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, 
subscribedTopicDescriber)

(kafka) branch trunk updated: MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273)

2024-06-11 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new ac2a642ba99 MINOR: Fix flaky test 
ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs 
(#16273)
ac2a642ba99 is described below

commit ac2a642ba99cde8a398510ce5cf503462863b489
Author: Chris Egerton 
AuthorDate: Tue Jun 11 21:13:35 2024 +0200

MINOR: Fix flaky test 
ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs 
(#16273)

Reviewers: Greg Harris 
---
 .../kafka/connect/integration/ConnectWorkerIntegrationTest.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 83fce9231f7..c540016f104 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -1309,7 +1309,6 @@ public class ConnectWorkerIntegrationTest {
 // since failure to reconfigure the tasks (which may occur if the bug 
this test was written
 // to help catch resurfaces) will not cause existing tasks to fail or 
stop running
 StartAndStopLatch restarts = connectorHandle.expectedStarts(1);
-connectorHandle.expectedCommits(NUM_TASKS * 2);
 
 final String secondConnectorTopic = "connector-topic-2";
 connect.kafka().createTopic(secondConnectorTopic, 1);
@@ -1323,6 +1322,9 @@ public class ConnectWorkerIntegrationTest {
 "Connector tasks were not restarted in time",
 restarts.await(10, TimeUnit.SECONDS)
 );
+
+// Wait for at least one task to commit offsets after being restarted
+connectorHandle.expectedCommits(1);
 connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
 
 final long endOffset = connect.kafka().endOffset(new 
TopicPartition(secondConnectorTopic, 0));



(kafka) branch 3.8 updated: MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273)

2024-06-11 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 5c13a6cf2f2 MINOR: Fix flaky test 
ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs 
(#16273)
5c13a6cf2f2 is described below

commit 5c13a6cf2f2bb37a6f7b80595483a56cbd85a77f
Author: Chris Egerton 
AuthorDate: Tue Jun 11 21:13:35 2024 +0200

MINOR: Fix flaky test 
ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs 
(#16273)

Reviewers: Greg Harris 
---
 .../kafka/connect/integration/ConnectWorkerIntegrationTest.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 83fce9231f7..c540016f104 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -1309,7 +1309,6 @@ public class ConnectWorkerIntegrationTest {
 // since failure to reconfigure the tasks (which may occur if the bug 
this test was written
 // to help catch resurfaces) will not cause existing tasks to fail or 
stop running
 StartAndStopLatch restarts = connectorHandle.expectedStarts(1);
-connectorHandle.expectedCommits(NUM_TASKS * 2);
 
 final String secondConnectorTopic = "connector-topic-2";
 connect.kafka().createTopic(secondConnectorTopic, 1);
@@ -1323,6 +1322,9 @@ public class ConnectWorkerIntegrationTest {
 "Connector tasks were not restarted in time",
 restarts.await(10, TimeUnit.SECONDS)
 );
+
+// Wait for at least one task to commit offsets after being restarted
+connectorHandle.expectedCommits(1);
 connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
 
 final long endOffset = connect.kafka().endOffset(new 
TopicPartition(secondConnectorTopic, 0));



(kafka) branch trunk updated (8b6013f851f -> 2fa2c72581d)

2024-06-11 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 8b6013f851f KAFKA-15045: (KIP-924 pt. 21) UUID to ProcessId migration 
(#16269)
 add 2fa2c72581d MINOR: Wait for embedded clusters to start before using 
them in Connect OffsetsApiIntegrationTest (#16286)

No new revisions were added by this update.

Summary of changes:
 .../kafka/connect/integration/OffsetsApiIntegrationTest.java | 9 +
 1 file changed, 9 insertions(+)



(kafka) branch 3.8 updated: MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286)

2024-06-11 Thread cegerton
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 520fbb4116b MINOR: Wait for embedded clusters to start before using 
them in Connect OffsetsApiIntegrationTest (#16286)
520fbb4116b is described below

commit 520fbb4116b92bbb362e2a67f0b20ffc644f2903
Author: Chris Egerton 
AuthorDate: Tue Jun 11 23:15:07 2024 +0200

MINOR: Wait for embedded clusters to start before using them in Connect 
OffsetsApiIntegrationTest (#16286)

Reviewers: Greg Harris 
---
 .../kafka/connect/integration/OffsetsApiIntegrationTest.java | 9 +
 1 file changed, 9 insertions(+)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
index dc507b68df7..2da52cf9abd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java
@@ -143,6 +143,15 @@ public class OffsetsApiIntegrationTest {
 
 result.start();
 
+try {
+result.assertions().assertExactlyNumWorkersAreUp(
+NUM_WORKERS,
+"Workers did not complete startup in time"
+);
+} catch (InterruptedException e) {
+throw new RuntimeException("Interrupted while awaiting cluster 
startup", e);
+}
+
 return result;
 });
 }



(kafka) branch trunk updated (2fa2c72581d -> 23fe71d579f)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 2fa2c72581d MINOR: Wait for embedded clusters to start before using 
them in Connect OffsetsApiIntegrationTest (#16286)
 add 23fe71d579f KAFKA-15265: Integrate RLMQuotaManager for throttling 
copies to remote storage (#15820)

No new revisions were added by this update.

Summary of changes:
 checkstyle/import-control-core.xml |   1 +
 .../java/kafka/log/remote/RemoteLogManager.java|  29 +++
 .../kafka/log/remote/RemoteLogManagerTest.java | 210 +
 3 files changed, 240 insertions(+)



(kafka) branch 3.8 updated: KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (#15993)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 7c30eed66c4 KAFKA-16541 Fix potential leader-epoch checkpoint file 
corruption (#15993)
7c30eed66c4 is described below

commit 7c30eed66c478623da8d0ee22a7379d077ef73e7
Author: Okada Haruki 
AuthorDate: Thu Jun 6 15:10:13 2024 +0900

KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (#15993)

A patch for KAFKA-15046 got rid of fsync on 
LeaderEpochFileCache#truncateFromStart/End for performance reason, but it 
turned out this could cause corrupted leader-epoch checkpoint file on 
ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing 
dirty pages back to the device.

To address this problem, this PR makes below changes: (1) Revert 
LeaderEpochCheckpoint#write to always fsync
(2) truncateFromStart/End now call LeaderEpochCheckpoint#write 
asynchronously on scheduler thread
(3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from 
checkpoint file only when current cache is absent

Reviewers: Jun Rao 
---
 .../java/kafka/log/remote/RemoteLogManager.java|  46 --
 core/src/main/scala/kafka/log/LogLoader.scala  |   4 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala |  43 +++---
 .../server/checkpoints/OffsetCheckpointFile.scala  |   2 +-
 .../kafka/log/remote/RemoteLogManagerTest.java | 120 ---
 .../unit/kafka/cluster/PartitionLockTest.scala |   3 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |   3 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala |   3 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |   5 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  23 ++-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  16 +-
 .../unit/kafka/server/ReplicaManagerTest.scala |   5 +-
 .../InMemoryLeaderEpochCheckpointTest.scala|  58 ---
 ...ffsetCheckpointFileWithFailureHandlerTest.scala |  18 ++-
 .../server/epoch/LeaderEpochFileCacheTest.scala|  64 
 .../scala/unit/kafka/utils/SchedulerTest.scala |   3 +-
 .../apache/kafka/server/common/CheckpointFile.java |   8 +-
 .../CheckpointFileWithFailureHandler.java  |  22 ++-
 .../checkpoint/InMemoryLeaderEpochCheckpoint.java  |  63 
 .../checkpoint/LeaderEpochCheckpoint.java  |  34 -
 .../checkpoint/LeaderEpochCheckpointFile.java  |  11 +-
 .../internals/epoch/LeaderEpochFileCache.java  | 168 -
 22 files changed, 364 insertions(+), 358 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 137985d00b5..43c03190767 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.config.ServerConfigs;
 import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
@@ -62,7 +63,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
-import 
org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.log.AbortedTxn;
 import org.apache.kafka.storage.internals.log.EpochEntry;
@@ -84,12 +85,16 @@ import org.slf4j.LoggerFactory;
 import scala.Option;
 import scala.collection.JavaConverters;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStreamWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
@@ -618,25 +623,23 @@ public class RemoteLogManager implements Closeable {
 }
 
 /**
- * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+ * Returns the leader epoch entries within the range of the given 
start[exclusive] and end

(kafka) branch 3.8 updated: KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820)

2024-06-11 Thread satishd
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 0b4fcbb16d0 KAFKA-15265: Integrate RLMQuotaManager for throttling 
copies to remote storage (#15820)
0b4fcbb16d0 is described below

commit 0b4fcbb16d0fbab67df906bdfe0bb7b880503e22
Author: Abhijeet Kumar 
AuthorDate: Wed Jun 12 06:27:02 2024 +0530

KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote 
storage (#15820)

- Added the integration of the quota manager to throttle copy requests to 
the remote storage. Reference KIP-956
- Added unit-tests for the copy throttling logic.

Reviewers: Satish Duggana , Luke Chen 
, Kamal Chandraprakash
---
 checkstyle/import-control-core.xml |   1 +
 .../java/kafka/log/remote/RemoteLogManager.java|  29 +++
 .../kafka/log/remote/RemoteLogManagerTest.java | 210 +
 3 files changed, 240 insertions(+)

diff --git a/checkstyle/import-control-core.xml 
b/checkstyle/import-control-core.xml
index ed6c53a322b..a30de55e415 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -38,6 +38,7 @@
   
   
   
+  
   
   
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 43c03190767..b920a962afc 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -97,6 +97,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.security.PrivilegedAction;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -123,6 +124,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -160,6 +163,8 @@ public class RemoteLogManager implements Closeable {
 
 private final RemoteLogMetadataManager remoteLogMetadataManager;
 
+private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true);
+private final Condition copyQuotaManagerLockCondition = 
copyQuotaManagerLock.newCondition();
 private final RLMQuotaManager rlmCopyQuotaManager;
 private final RLMQuotaManager rlmFetchQuotaManager;
 
@@ -250,6 +255,13 @@ public class RemoteLogManager implements Closeable {
 remoteStorageReaderThreadPool.removeMetrics();
 }
 
+/**
+ * Returns the timeout for the RLM Tasks to wait for the quota to be 
available
+ */
+Duration quotaTimeout() {
+return Duration.ofSeconds(1);
+}
+
 RLMQuotaManager createRLMCopyQuotaManager() {
 return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, 
QuotaType.RLMCopy$.MODULE$,
   "Tracking copy byte-rate for Remote Log Manager", time);
@@ -763,6 +775,23 @@ public class RemoteLogManager implements Closeable {
 isCancelled(), isLeader());
 return;
 }
+
+copyQuotaManagerLock.lock();
+try {
+while (rlmCopyQuotaManager.isQuotaExceeded()) {
+logger.debug("Quota exceeded for copying 
log segments, waiting for the quota to be available.");
+// If the thread gets interrupted while 
waiting, the InterruptedException is thrown
+// back to the caller. It's important to 
note that the task being executed is already
+// cancelled before the executing thread 
is interrupted. The caller is responsible
+// for handling the exception gracefully 
by checking if the task is already cancelled.
+boolean ignored = 
copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), 
TimeUnit.MILLISECONDS);
+}
+
rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes());
+// Signal waiting threads to check the quota 
again
+copyQuotaManagerLockCondition.signalAll();
+} finally {
+copyQuotaManagerLock.unlock();
+}
 copyLogSegment(log, 
candidateLogSegment.logSegment, 

(kafka) branch trunk updated (23fe71d579f -> 226ac5e8fca)

2024-06-11 Thread chia7712
This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


from 23fe71d579f KAFKA-15265: Integrate RLMQuotaManager for throttling 
copies to remote storage (#15820)
 add 226ac5e8fca KAFKA-16922 Adding unit tests for NewTopic  (#16255)

No new revisions were added by this update.

Summary of changes:
 .../apache/kafka/clients/admin/NewTopicTest.java   | 185 +
 1 file changed, 185 insertions(+)
 create mode 100644 
clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java



(kafka) branch trunk updated: KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)

2024-06-11 Thread ableegoldman
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 0782232fbeb KAFKA-15045: (KIP-924 pt. 22) Add 
RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)
0782232fbeb is described below

commit 0782232fbeb6313a316b930d12508d1d6148f3c9
Author: Antoine Pourchet 
AuthorDate: Tue Jun 11 22:31:43 2024 -0600

KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other 
minor TaskAssignmentUtils changes (#16294)

We now provide a way to more easily customize the rack aware
optimizations that we provide by way of a configuration class called
RackAwareOptimizationParams.

We also simplified the APIs for the optimizeXYZ utility functions since
they were mutating the inputs anyway.

Reviewers: Anna Sophie Blee-Goldman 
---
 .../processor/assignment/TaskAssignmentUtils.java  | 207 -
 .../assignment/assignors/StickyTaskAssignor.java   |  38 ++--
 .../assignment/TaskAssignmentUtilsTest.java|   7 +-
 3 files changed, 185 insertions(+), 67 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
index f67e54b2e1c..39a698adfa5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -55,7 +56,103 @@ public final class TaskAssignmentUtils {
 private TaskAssignmentUtils() {}
 
 /**
- * Return an {@code AssignmentError} for a task assignment created for an 
application.
+ * A simple config container for necessary parameters and optional 
overrides to apply when
+ * running the active or standby task rack-aware optimizations.
+ */
+public static class RackAwareOptimizationParams {
+private final ApplicationState applicationState;
+private final Optional trafficCostOverride;
+private final Optional nonOverlapCostOverride;
+private final Optional> tasksToOptimize;
+
+private RackAwareOptimizationParams(final ApplicationState 
applicationState,
+   final Optional 
trafficCostOverride,
+   final Optional 
nonOverlapCostOverride,
+   final Optional> 
tasksToOptimize) {
+this.applicationState = applicationState;
+this.trafficCostOverride = trafficCostOverride;
+this.nonOverlapCostOverride = nonOverlapCostOverride;
+this.tasksToOptimize = tasksToOptimize;
+}
+
+/**
+ * Return a new config object with no overrides and the 
tasksToOptimize initialized to the set of all tasks in the given 
ApplicationState
+ */
+public static RackAwareOptimizationParams of(final ApplicationState 
applicationState) {
+return new RackAwareOptimizationParams(applicationState, 
Optional.empty(), Optional.empty(), Optional.empty());
+}
+
+/**
+ * Return a new config object with the tasksToOptimize set to all 
stateful tasks in the given ApplicationState
+ */
+public RackAwareOptimizationParams forStatefulTasks() {
+final SortedSet tasks = 
applicationState.allTasks().values()
+.stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toCollection(TreeSet::new));
+return forTasks(tasks);
+}
+
+/**
+ * Return a new config object with the tasksToOptimize set to all 
stateless tasks in the given ApplicationState
+ */
+public RackAwareOptimizationParams forStatelessTasks() {
+final SortedSet tasks = 
applicationState.allTasks().values()
+.stream()
+.filter(taskInfo -> !taskInfo.isStateful())
+.map(TaskInfo::id)
+.collect(Collectors.toCollection(TreeSet::new));
+return forTasks(tasks);
+}
+
+/**
+ * Return a new config object with the provided tasksToOptimize
+ */
+public RackAwareOptimizationParams forTasks(final SortedSet 
tasksToOptimize) {
+return new RackAwareOptimizationParams(
+applicationState,
+trafficCostOverride,
+nonOverlapCostOver

(kafka) branch 3.8 updated: KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)

2024-06-11 Thread ableegoldman
This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 77a6fe9c2a5 KAFKA-15045: (KIP-924 pt. 22) Add 
RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)
77a6fe9c2a5 is described below

commit 77a6fe9c2a5d7d918536162c7db37b67c772d494
Author: Antoine Pourchet 
AuthorDate: Tue Jun 11 22:31:43 2024 -0600

KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other 
minor TaskAssignmentUtils changes (#16294)

We now provide a way to more easily customize the rack aware
optimizations that we provide by way of a configuration class called
RackAwareOptimizationParams.

We also simplified the APIs for the optimizeXYZ utility functions since
they were mutating the inputs anyway.

Reviewers: Anna Sophie Blee-Goldman 
---
 .../processor/assignment/TaskAssignmentUtils.java  | 207 -
 .../assignment/assignors/StickyTaskAssignor.java   |  38 ++--
 .../assignment/TaskAssignmentUtilsTest.java|   7 +-
 3 files changed, 185 insertions(+), 67 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
index f67e54b2e1c..39a698adfa5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -55,7 +56,103 @@ public final class TaskAssignmentUtils {
 private TaskAssignmentUtils() {}
 
 /**
- * Return an {@code AssignmentError} for a task assignment created for an 
application.
+ * A simple config container for necessary parameters and optional 
overrides to apply when
+ * running the active or standby task rack-aware optimizations.
+ */
+public static class RackAwareOptimizationParams {
+private final ApplicationState applicationState;
+private final Optional trafficCostOverride;
+private final Optional nonOverlapCostOverride;
+private final Optional> tasksToOptimize;
+
+private RackAwareOptimizationParams(final ApplicationState 
applicationState,
+   final Optional 
trafficCostOverride,
+   final Optional 
nonOverlapCostOverride,
+   final Optional> 
tasksToOptimize) {
+this.applicationState = applicationState;
+this.trafficCostOverride = trafficCostOverride;
+this.nonOverlapCostOverride = nonOverlapCostOverride;
+this.tasksToOptimize = tasksToOptimize;
+}
+
+/**
+ * Return a new config object with no overrides and the 
tasksToOptimize initialized to the set of all tasks in the given 
ApplicationState
+ */
+public static RackAwareOptimizationParams of(final ApplicationState 
applicationState) {
+return new RackAwareOptimizationParams(applicationState, 
Optional.empty(), Optional.empty(), Optional.empty());
+}
+
+/**
+ * Return a new config object with the tasksToOptimize set to all 
stateful tasks in the given ApplicationState
+ */
+public RackAwareOptimizationParams forStatefulTasks() {
+final SortedSet tasks = 
applicationState.allTasks().values()
+.stream()
+.filter(TaskInfo::isStateful)
+.map(TaskInfo::id)
+.collect(Collectors.toCollection(TreeSet::new));
+return forTasks(tasks);
+}
+
+/**
+ * Return a new config object with the tasksToOptimize set to all 
stateless tasks in the given ApplicationState
+ */
+public RackAwareOptimizationParams forStatelessTasks() {
+final SortedSet tasks = 
applicationState.allTasks().values()
+.stream()
+.filter(taskInfo -> !taskInfo.isStateful())
+.map(TaskInfo::id)
+.collect(Collectors.toCollection(TreeSet::new));
+return forTasks(tasks);
+}
+
+/**
+ * Return a new config object with the provided tasksToOptimize
+ */
+public RackAwareOptimizationParams forTasks(final SortedSet 
tasksToOptimize) {
+return new RackAwareOptimizationParams(
+applicationState,
+trafficCostOverride,
+nonOverlapCostOverride

(kafka) branch trunk updated: KAFKA-10199: Enable state updater by default (#16107)

2024-06-11 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 39ffdea6d32 KAFKA-10199: Enable state updater by default (#16107)
39ffdea6d32 is described below

commit 39ffdea6d321ef3dd5e787aef1b1102c33448c0f
Author: Bruno Cadonna 
AuthorDate: Wed Jun 12 07:51:38 2024 +0200

KAFKA-10199: Enable state updater by default (#16107)

We have already enabled the state updater by default once.
However, we ran into issues that forced us to disable it again.
We think that we fixed those issues. So we want to enable the
state updater again by default.

Reviewers: Lucas Brutschy , Matthias J. Sax 

---
 streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java   | 2 +-
 .../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index bfea3d43680..e77e4ca795d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1273,7 +1273,7 @@ public class StreamsConfig extends AbstractConfig {
 public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
 
 public static boolean getStateUpdaterEnabled(final Map 
configs) {
-return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, false);
+return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, true);
 }
 
 // Private API to enable processing threads (i.e. polling is decoupled 
from processing)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 763394611b9..457508cd20e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -468,10 +468,10 @@ public class StoreChangelogReaderTest {
 
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
 } else {
 if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
-|| !((boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED))) {
-assertEquals(Duration.ZERO, consumer.lastPollTimeout());
-} else {
+|| (boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
 
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
+} else {
+assertEquals(Duration.ZERO, consumer.lastPollTimeout());
 }
 }
 }



(kafka) branch 3.8 updated: KAFKA-10199: Enable state updater by default (#16107)

2024-06-11 Thread cadonna
This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 8de153ebd6c KAFKA-10199: Enable state updater by default (#16107)
8de153ebd6c is described below

commit 8de153ebd6c44b12940b156a0d3ba4aa2795f6f8
Author: Bruno Cadonna 
AuthorDate: Wed Jun 12 07:51:38 2024 +0200

KAFKA-10199: Enable state updater by default (#16107)

We have already enabled the state updater by default once.
However, we ran into issues that forced us to disable it again.
We think that we fixed those issues. So we want to enable the
state updater again by default.

Reviewers: Lucas Brutschy , Matthias J. Sax 

---
 streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java   | 2 +-
 .../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++---
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index ffeb4105cf1..502eab8eb87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1261,7 +1261,7 @@ public class StreamsConfig extends AbstractConfig {
 public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
 
 public static boolean getStateUpdaterEnabled(final Map 
configs) {
-return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, false);
+return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, true);
 }
 
 // Private API to enable processing threads (i.e. polling is decoupled 
from processing)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 763394611b9..457508cd20e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -468,10 +468,10 @@ public class StoreChangelogReaderTest {
 
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
 } else {
 if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
-|| !((boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED))) {
-assertEquals(Duration.ZERO, consumer.lastPollTimeout());
-} else {
+|| (boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
 
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
+} else {
+assertEquals(Duration.ZERO, consumer.lastPollTimeout());
 }
 }
 }



(kafka) branch trunk updated: KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)

2024-06-11 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
 new 638844f833b KAFKA-16770; [2/2] Coalesce records into bigger batches 
(#16215)
638844f833b is described below

commit 638844f833b165d6f9ca52c173858d26b7254fac
Author: David Jacot 
AuthorDate: Wed Jun 12 08:29:50 2024 +0200

KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)

This patch is the continuation of 
https://github.com/apache/kafka/pull/15964. It introduces the records 
coalescing to the CoordinatorRuntime. It also introduces a new configuration 
`group.coordinator.append.linger.ms` which allows administrators to chose the 
linger time or disable it with zero. The new configuration defaults to 10ms.

Reviewers: Jeff Kim , Justine Olshan 

---
 .../kafka/common/record/MemoryRecordsBuilder.java  |  12 +
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +
 .../coordinator/group/GroupCoordinatorConfig.java  |  12 +
 .../coordinator/group/GroupCoordinatorService.java |   2 +-
 .../group/runtime/CoordinatorRuntime.java  | 672 ++-
 .../group/GroupCoordinatorConfigTest.java  |   3 +
 .../group/GroupCoordinatorServiceTest.java |   1 +
 .../group/runtime/CoordinatorRuntimeTest.java  | 736 -
 .../apache/kafka/server/util/timer/TimerTask.java  |   4 +
 10 files changed, 1263 insertions(+), 182 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 70f279a6c29..b37b1f1ca68 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable 
{
 return this.writeLimit >= estimatedBytesWritten() + recordSize;
 }
 
+/**
+ * Check if we have room for a given number of bytes.
+ */
+public boolean hasRoomFor(int estimatedRecordsSize) {
+if (isFull()) return false;
+return this.writeLimit >= estimatedBytesWritten() + 
estimatedRecordsSize;
+}
+
+public int maxAllowedBytes() {
+return this.writeLimit - this.batchHeaderSizeInBytes;
+}
+
 public boolean isClosed() {
 return builtRecords != null;
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 31db58c0778..7e225440abf 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -570,6 +570,7 @@ class BrokerServer(
   val serde = new CoordinatorRecordSerde
   val groupCoordinatorConfig = new GroupCoordinatorConfig(
 config.groupCoordinatorNumThreads,
+config.groupCoordinatorAppendLingerMs,
 config.consumerGroupSessionTimeoutMs,
 config.consumerGroupHeartbeatIntervalMs,
 config.consumerGroupMaxSize,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ad8635f7ce7..db96bcb6762 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -278,6 +278,7 @@ object KafkaConfig {
   
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
 ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
   .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
+  
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
   // Internal configuration used by integration and system tests.
   
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, 
MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
 
@@ -965,6 +966,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val isNewGroupCoordinatorEnabled = 
getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
 groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
   val groupCoordinatorNumThreads = 
getInt(GroupC

(kafka) branch 3.8 updated: KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)

2024-06-11 Thread dajac
This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
 new 6016b15beab KAFKA-16770; [2/2] Coalesce records into bigger batches 
(#16215)
6016b15beab is described below

commit 6016b15beabd774de6a358f5fdb62a336b7de43e
Author: David Jacot 
AuthorDate: Wed Jun 12 08:29:50 2024 +0200

KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)

This patch is the continuation of 
https://github.com/apache/kafka/pull/15964. It introduces the records 
coalescing to the CoordinatorRuntime. It also introduces a new configuration 
`group.coordinator.append.linger.ms` which allows administrators to chose the 
linger time or disable it with zero. The new configuration defaults to 10ms.

Reviewers: Jeff Kim , Justine Olshan 

---
 .../kafka/common/record/MemoryRecordsBuilder.java  |  12 +
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +
 .../coordinator/group/GroupCoordinatorConfig.java  |  12 +
 .../coordinator/group/GroupCoordinatorService.java |   2 +-
 .../group/runtime/CoordinatorRuntime.java  | 672 ++-
 .../group/GroupCoordinatorConfigTest.java  |   3 +
 .../group/GroupCoordinatorServiceTest.java |   1 +
 .../group/runtime/CoordinatorRuntimeTest.java  | 736 -
 .../apache/kafka/server/util/timer/TimerTask.java  |   4 +
 10 files changed, 1263 insertions(+), 182 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a5985103ec0..bbcd99070cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable 
{
 return this.writeLimit >= estimatedBytesWritten() + recordSize;
 }
 
+/**
+ * Check if we have room for a given number of bytes.
+ */
+public boolean hasRoomFor(int estimatedRecordsSize) {
+if (isFull()) return false;
+return this.writeLimit >= estimatedBytesWritten() + 
estimatedRecordsSize;
+}
+
+public int maxAllowedBytes() {
+return this.writeLimit - this.batchHeaderSizeInBytes;
+}
+
 public boolean isClosed() {
 return builtRecords != null;
 }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index e143dd26668..cb4799afdfb 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -572,6 +572,7 @@ class BrokerServer(
   val serde = new CoordinatorRecordSerde
   val groupCoordinatorConfig = new GroupCoordinatorConfig(
 config.groupCoordinatorNumThreads,
+config.groupCoordinatorAppendLingerMs,
 config.consumerGroupSessionTimeoutMs,
 config.consumerGroupHeartbeatIntervalMs,
 config.consumerGroupMaxSize,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 745c8648e38..6c9ef51fb0a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -278,6 +278,7 @@ object KafkaConfig {
   
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, 
LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
 ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
   .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 
INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC)
+  
.define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, 
GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), 
MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
   // Internal configuration used by integration and system tests.
   
.defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, 
BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, 
MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC)
 
@@ -948,6 +949,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   val isNewGroupCoordinatorEnabled = 
getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) ||
 groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER)
   val groupCoordinatorNumThreads = 
getInt(GroupCoord