(kafka) 07/12: KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra (#16180)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 69158f67f842da7a166fad5a69bc09e41e9f4e25 Author: Kamal Chandraprakash AuthorDate: Wed Jun 5 00:41:30 2024 +0530 KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra (#16180) - Removed the RemoteLogSegmentLifecycleManager - Removed the TopicBasedRemoteLogMetadataManagerWrapper, RemoteLogMetadataCacheWrapper, TopicBasedRemoteLogMetadataManagerHarness and TopicBasedRemoteLogMetadataManagerWrapperWithHarness Reviewers: Kuan-Po (Cooper) Tseng , Chia-Ping Tsai --- .../storage/RemoteLogMetadataManagerTestUtils.java | 23 +- .../storage/RemoteLogSegmentLifecycleManager.java | 63 --- .../storage/RemoteLogSegmentLifecycleTest.java | 562 - .../TopicBasedRemoteLogMetadataManagerHarness.java | 90 ...icBasedRemoteLogMetadataManagerRestartTest.java | 1 - ...RemoteLogMetadataManagerWrapperWithHarness.java | 105 6 files changed, 201 insertions(+), 643 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java index fd35167d02a..3fc0432e773 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java @@ -17,17 +17,12 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.Assertions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; import java.util.function.Supplier; @@ -37,10 +32,9 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; public class RemoteLogMetadataManagerTestUtils { -private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataManagerTestUtils.class); - private static final int METADATA_TOPIC_PARTITIONS_COUNT = 3; private static final short METADATA_TOPIC_REPLICATION_FACTOR = 2; private static final long METADATA_TOPIC_RETENTION_MS = 24 * 60 * 60 * 1000L; @@ -53,7 +47,6 @@ public class RemoteLogMetadataManagerTestUtils { private String bootstrapServers; private boolean startConsumerThread; private Map overrideRemoteLogMetadataManagerProps = Collections.emptyMap(); -private Set topicIdPartitions = Collections.emptySet(); private Supplier remotePartitionMetadataStore = RemotePartitionMetadataStore::new; private Function remoteLogMetadataTopicPartitioner = RemoteLogMetadataTopicPartitioner::new; @@ -85,11 +78,6 @@ public class RemoteLogMetadataManagerTestUtils { return this; } -public Builder topicIdPartitions(Set topicIdPartitions) { -this.topicIdPartitions = Objects.requireNonNull(topicIdPartitions); -return this; -} - public TopicBasedRemoteLogMetadataManager build() { Objects.requireNonNull(bootstrapServers); String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath(); @@ -105,19 +93,12 @@ public class RemoteLogMetadataManagerTestUtils { configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT); configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR); configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS); - -log.debug("TopicBasedRemoteLogMetadataManager configs before adding overridden properties: {}", configs); // Add override properties. configs.putAll(overrideRemoteLogMetadataManagerProps); -log.debug("TopicBasedRemoteLogMetadataManager configs after adding overridden properties: {}", configs); topicBasedRemoteLogMetadataMa
(kafka) 06/12: KAFKA-16880 Update equals and hashcode methods for two attributes (#16173)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 944f4699a7de443a72fa2618b5cbc8531f12a8c4 Author: Murali Basani AuthorDate: Tue Jun 4 08:05:00 2024 +0200 KAFKA-16880 Update equals and hashcode methods for two attributes (#16173) Reviewers: Kamal Chandraprakash , Chia-Ping Tsai --- .../log/remote/storage/RemoteLogManagerConfig.java | 5 +- .../remote/storage/RemoteLogManagerConfigTest.java | 107 +++-- 2 files changed, 102 insertions(+), 10 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index d6cf615c781..1add933d788 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -585,6 +585,8 @@ public final class RemoteLogManagerConfig { return enableRemoteStorageSystem == that.enableRemoteStorageSystem && remoteLogIndexFileCacheTotalSizeBytes == that.remoteLogIndexFileCacheTotalSizeBytes && remoteLogManagerThreadPoolSize == that.remoteLogManagerThreadPoolSize +&& remoteLogManagerCopierThreadPoolSize == that.remoteLogManagerCopierThreadPoolSize +&& remoteLogManagerExpirationThreadPoolSize == that.remoteLogManagerExpirationThreadPoolSize && remoteLogManagerTaskIntervalMs == that.remoteLogManagerTaskIntervalMs && remoteLogManagerTaskRetryBackoffMs == that.remoteLogManagerTaskRetryBackoffMs && remoteLogManagerTaskRetryBackoffMaxMs == that.remoteLogManagerTaskRetryBackoffMaxMs @@ -613,7 +615,8 @@ public final class RemoteLogManagerConfig { public int hashCode() { return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath, remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName, -remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs, +remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, +remoteLogManagerCopierThreadPoolSize, remoteLogManagerExpirationThreadPoolSize, remoteLogManagerTaskIntervalMs, remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter, remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps, remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix, remoteLogManagerCopyMaxBytesPerSecond, diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java index 4e3c2fc26cb..d04c42409b2 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfigTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.server.log.remote.storage; import org.apache.kafka.common.config.AbstractConfig; -import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -27,6 +27,8 @@ import java.util.Map; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; public class RemoteLogManagerConfigTest { @@ -43,13 +45,11 @@ public class RemoteLogManagerConfigTest { String rlmmPrefix = "__custom.rlmm."; Map rsmProps = Collections.singletonMap("rsm.prop", "val"); Map rlmmProps = Collections.singletonMap("rlmm.prop", "val"); -String remoteLogMetadataManagerClass = useDefaultRemoteLogMetadataManagerClass ? DEFAULT_REMOTE_LOG_METADATA_MANAGER_CLASS_NAME : "dummy.remote.log.metadata.class"; -RemoteLogManagerConfig expectedRemoteLogManagerConfig -= new RemoteLogManagerConfig(true, "dummy.remote.storage.class", "dummy.remote.storage.class.path", -
(kafka) 09/12: MINOR: Cleanup the storage module unit tests (#16202)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 025e791d0cdfc025fd064379f8ebb8b9fe380a26 Author: Kamal Chandraprakash AuthorDate: Thu Jun 6 21:26:08 2024 +0530 MINOR: Cleanup the storage module unit tests (#16202) - Use SystemTime instead of MockTime when time is not mocked - Use static assertions to reduce the line length - Fold the lines if it exceeds the limit - rename tp0 to tpId0 when it refers to TopicIdPartition Reviewers: Kuan-Po (Cooper) Tseng , Chia-Ping Tsai --- .../storage/RemoteLogMetadataFormatterTest.java| 18 ++--- .../storage/RemoteLogMetadataSerdeTest.java| 38 +- .../storage/RemoteLogMetadataTransformTest.java| 40 +-- ...picBasedRemoteLogMetadataManagerConfigTest.java | 53 +- ...ogMetadataManagerMultipleSubscriptionsTest.java | 41 +-- ...icBasedRemoteLogMetadataManagerRestartTest.java | 25 +++ .../TopicBasedRemoteLogMetadataManagerTest.java| 56 --- .../storage/RemoteLogMetadataManagerTest.java | 83 -- 8 files changed, 169 insertions(+), 185 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java index e3d1a2aee0c..1380a735fba 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteL import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata; -import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.junit.jupiter.api.Test; import java.io.ByteArrayOutputStream; @@ -35,6 +34,7 @@ import java.util.Map; import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState.COPY_SEGMENT_STARTED; public class RemoteLogMetadataFormatterTest { private static final Uuid TOPIC_ID = Uuid.randomUuid(); @@ -51,12 +51,12 @@ public class RemoteLogMetadataFormatterTest { RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID); Optional customMetadata = Optional.of(new CustomMetadata(new byte[10])); RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata( -remoteLogSegmentId, 0L, 100L, -1L, 1, -123L, 1024, customMetadata, -RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs); +remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, customMetadata, COPY_SEGMENT_STARTED, +segLeaderEpochs); byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata); -ConsumerRecord metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes); +ConsumerRecord metadataRecord = new ConsumerRecord<>( +"__remote_log_metadata", 0, 0, null, metadataBytes); String expected = String.format( "partition: 0, offset: 0, value: " + @@ -68,9 +68,11 @@ public class RemoteLogMetadataFormatterTest { TOPIC_ID, SEGMENT_ID); try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(baos)) { -RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = new RemoteLogMetadataSerde.RemoteLogMetadataFormatter(); -formatter.writeTo(metadataRecord, ps); -assertEquals(expected, baos.toString()); +try (RemoteLogMetadataSerde.RemoteLogMetadataFormatter formatter = + new RemoteLogMetadataSerde.RemoteLogMetadataFormatter()) { +formatter.writeTo(metadataRecord, ps); +assertEquals(expected, baos.toString()); +} } } } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java index 5b48790c7fd..b1b91dacf23 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java @@ -19,48 +19,48 @@ package
(kafka) 04/12: KAFKA-16859 Cleanup check if tiered storage is enabled (#16153)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit f9c37032ffc316de86fee3d8096a53a56451 Author: Ken Huang <100591800+m1a...@users.noreply.github.com> AuthorDate: Mon Jun 3 12:04:58 2024 +0900 KAFKA-16859 Cleanup check if tiered storage is enabled (#16153) Reviewers: Chia-Ping Tsai --- .../kafka/server/builders/KafkaApisBuilder.java| 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 2 +- .../src/main/scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/ConfigHandler.scala| 2 +- .../server/ControllerConfigurationValidator.scala | 3 ++- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +--- core/src/main/scala/kafka/server/KafkaServer.scala | 2 +- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 ++-- .../kafka/log/remote/RemoteLogManagerTest.java | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 26 +++--- .../unit/kafka/server/ReplicaManagerTest.scala | 4 ++-- 12 files changed, 27 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 6ffd741f4fc..1d422461678 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public class KafkaApisBuilder { if (metrics == null) throw new RuntimeException("You must set metrics"); if (quotas == null) throw new RuntimeException("You must set quotas"); if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); -if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled()); +if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); return new KafkaApis(requestChannel, diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 82aa75909ab..5e8cf2dcdc6 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -185,7 +185,7 @@ public class ReplicaManagerBuilder { if (metadataCache == null) throw new RuntimeException("You must set metadataCache"); if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel"); if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager"); -if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled()); +if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); // Initialize metrics in the end just before passing it to ReplicaManager to ensure ReplicaManager closes the // metrics correctly. There might be a resource leak if it is initialized and an exception occurs between // its initialization and creation of ReplicaManager. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 8bc6db2dff6..45d6ab7908d 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1562,7 +1562,7 @@ object LogManager { keepPartitionMetadataFile: Boolean): LogManager = { val defaultProps = config.extractLogConfigMap -LogConfig.validateBrokerLogConfigValues(defaultProps, config.isRemoteLogStorageSystemEnabled) +LogConfig.validateBrokerLogConfigValues(defaultProps, config.remoteLogManagerConfig.enableRemoteStorageSystem()) val defaultLogConfig = new LogConfig(defaultProps) val cleanerConfig = LogCleaner.cleanerConfig(config) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 94bcf321420..baafc3e658f 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -186,7 +186,7 @@ class BrokerServer( kafkaScheduler.startup() /* register broker metrics */ - brokerTopicStats = new BrokerTopicStats(config.isRemoteLogStorageSystemEnabled) + brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig.enableRemoteStorageSystem()) quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-") diff --git a/core/src/ma
(kafka) 12/12: KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit d94a28b4a4f94143f51d71e03f639078a2e1 Author: Kamal Chandraprakash AuthorDate: Mon Jun 10 20:42:12 2024 +0530 KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203) Reviewers: Satish Duggana , Luke Chen --- .../scala/kafka/server/DynamicBrokerConfig.scala | 3 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 ++ .../main/scala/kafka/server/ReplicaManager.scala | 3 +- .../kafka/server/DynamicBrokerConfigTest.scala | 33 ++ .../log/remote/storage/RemoteLogManagerConfig.java | 4 --- 5 files changed, 38 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 6fa43b560dd..9b75425666f 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1202,6 +1202,7 @@ class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable w object DynamicRemoteLogConfig { val ReconfigurableConfigs = Set( -RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP +RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP, +RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP ) } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 06bce64f623..745c8648e38 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -1211,6 +1211,8 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logLocalRetentionMs: java.lang.Long = getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP) + def remoteFetchMaxWaitMs = getInt(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP) + validateValues() @nowarn("cat=deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 17e9a5c1b9e..6ab67a84ec4 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,10 +1479,9 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } -val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() +val remoteFetchMaxWaitMs = config.remoteFetchMaxWaitMs.toLong val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) - delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) None } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 328d6e41b57..2e5d77cdc6f 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -792,6 +792,39 @@ class DynamicBrokerConfigTest { verifyIncorrectLogLocalRetentionProps(2000L, 1000L, -1, 100) } + @Test + def testDynamicRemoteFetchMaxWaitMsConfig(): Unit = { +val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181) +val config = KafkaConfig(props) +val kafkaBroker = mock(classOf[KafkaBroker]) +when(kafkaBroker.config).thenReturn(config) +assertEquals(500, config.remoteFetchMaxWaitMs) + +val dynamicRemoteLogConfig = new DynamicRemoteLogConfig(kafkaBroker) +config.dynamicConfig.initialize(None, None) +config.dynamicConfig.addBrokerReconfigurable(dynamicRemoteLogConfig) + +val newProps = new Properties() +newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "3") +// update default config +config.dynamicConfig.validate(newProps, perBrokerConfig = false) +config.dynamicConfig.updateDefaultConfig(newProps) +assertEquals(3, config.remoteFetchMaxWaitMs) + +// update per broker config +newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, "1") +config.dynamicConfig.validate(newProps, perBrokerConfig = true) +config.dynamicConfig.updateBrokerConfig(0, newProps) +assertEquals(1, config.remoteFetchMaxWaitMs) + +// invalid values +for (maxWaitMs <- Seq(-1, 0)) { + newProps.put(RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP, maxWaitMs.toString) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConfig = true)) + assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(newProps, perBrokerConf
(kafka) 01/12: KAFKA-16866 Used the right constant in RemoteLogManagerTest#testFetchQuotaManagerConfig (#16152)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 68d92a5b43f001a06e4c5d9c6f244c6cc33595b1 Author: Kamal Chandraprakash AuthorDate: Fri May 31 22:44:31 2024 +0530 KAFKA-16866 Used the right constant in RemoteLogManagerTest#testFetchQuotaManagerConfig (#16152) Reviewers: Chia-Ping Tsai --- core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 72154c9fc22..0ba5d63a8da 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -128,6 +128,9 @@ import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemo import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX; import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX; import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC; @@ -2701,9 +2704,9 @@ public class RemoteLogManagerTest { Properties defaultProps = new Properties(); RemoteLogManagerConfig defaultRlmConfig = createRLMConfig(defaultProps); RLMQuotaManagerConfig defaultConfig = RemoteLogManager.fetchQuotaManagerConfig(defaultRlmConfig); -assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); -assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); - assertEquals(DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); +assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND, defaultConfig.quotaBytesPerSecond()); +assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM, defaultConfig.numQuotaSamples()); + assertEquals(DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_SIZE_SECONDS, defaultConfig.quotaWindowSizeSeconds()); Properties customProps = new Properties(); customProps.put(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP, 100);
(kafka) 03/12: MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest (#16171)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 2273e061389c42686b12317f625523b46fd27a2b Author: Kuan-Po (Cooper) Tseng AuthorDate: Mon Jun 3 03:59:56 2024 +0800 MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest (#16171) Reviewers: Chia-Ping Tsai --- .../storage/TopicBasedRemoteLogMetadataManagerRestartTest.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index 07b08145731..84b98dcb5be 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -72,8 +72,10 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { NewTopic newLeaderTopic = new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2))); // Set broker id 1 as the first entry which is taken as the leader. NewTopic newFollowerTopic = new NewTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0))); -admin.createTopics(Arrays.asList(newLeaderTopic, newFollowerTopic)); +admin.createTopics(Arrays.asList(newLeaderTopic, newFollowerTopic)).all().get(); } +clusterInstance.waitForTopic(leaderTopic, 1); +clusterInstance.waitForTopic(followerTopic, 1); final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); @@ -122,4 +124,4 @@ public class TopicBasedRemoteLogMetadataManagerRestartTest { topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))); } } -} \ No newline at end of file +}
(kafka) 02/12: KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra (#16170)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit c6f0db3c6069fd49564519b1bf26c10e3cd237e6 Author: Kuan-Po (Cooper) Tseng AuthorDate: Sun Jun 2 22:18:53 2024 +0800 KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra (#16170) Reviewers: Chia-Ping Tsai --- .../TopicBasedRemoteLogMetadataManagerHarness.java | 7 +- ...icBasedRemoteLogMetadataManagerRestartTest.java | 159 - 2 files changed, 62 insertions(+), 104 deletions(-) diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java index a063fa8820a..7af78e750a8 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java @@ -53,12 +53,6 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, RemoteLogMetadataTopicPartitioner::new, remotePartitionMetadataStoreSupplier); } -public void initializeRemoteLogMetadataManager(Set topicIdPartitions, - boolean startConsumerThread, - Function remoteLogMetadataTopicPartitioner) { -initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new); -} - public void initializeRemoteLogMetadataManager(Set topicIdPartitions, boolean startConsumerThread, Function remoteLogMetadataTopicPartitioner, @@ -70,6 +64,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa .startConsumerThread(startConsumerThread) .remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner) .remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier) + .overrideRemoteLogMetadataManagerProps(overrideRemoteLogMetadataManagerProps()) .build(); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index c599259ed94..07b08145731 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.server.log.remote.metadata.storage; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -24,139 +29,97 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; -@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") public class TopicBasedRemoteLogMetadataManagerRestartTest { private static final int SEG_SIZE = 1024 * 1024; private final Time time = new MockTime(1); private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath(); +private final ClusterInstance clusterInstance; -private TopicBasedRemoteLogMetadataManager
(kafka) branch 3.8 updated (facbab272fb -> d94a28b4a4f)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a change to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git from facbab272fb KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053) new 68d92a5b43f KAFKA-16866 Used the right constant in RemoteLogManagerTest#testFetchQuotaManagerConfig (#16152) new c6f0db3c606 KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra (#16170) new 2273e061389 MINOR: Fix missing wait topic finished in TopicBasedRemoteLogMetadataManagerRestartTest (#16171) new f9c37032ffc KAFKA-16859 Cleanup check if tiered storage is enabled (#16153) new e4a3da6b099 KAFKA-16852 Adding two thread pools kafka-16852 (#16154) new 944f4699a7d KAFKA-16880 Update equals and hashcode methods for two attributes (#16173) new 69158f67f84 KAFKA-16882 Migrate RemoteLogSegmentLifecycleTest to ClusterInstance infra (#16180) new b6848d699da KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (#14778) new 025e791d0cd MINOR: Cleanup the storage module unit tests (#16202) new 9460e6b266a KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199) new 781b93b00d9 KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled (#16256) new d94a28b4a4f KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203) The 12 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../kafka/clients/consumer/ConsumerConfig.java | 5 +- .../java/kafka/log/remote/RemoteLogManager.java| 2 +- .../kafka/server/builders/KafkaApisBuilder.java| 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 4 +- .../src/main/scala/kafka/server/BrokerServer.scala | 4 +- .../main/scala/kafka/server/ConfigHandler.scala| 2 +- .../server/ControllerConfigurationValidator.scala | 3 +- .../scala/kafka/server/DelayedRemoteFetch.scala| 3 +- .../scala/kafka/server/DynamicBrokerConfig.scala | 3 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 +- .../main/scala/kafka/server/ReplicaManager.scala | 4 +- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 +- .../kafka/log/remote/RemoteLogManagerTest.java | 15 +- .../kafka/server/DelayedRemoteFetchTest.scala | 21 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 26 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../kafka/server/DynamicBrokerConfigTest.scala | 33 ++ .../unit/kafka/server/ReplicaManagerTest.scala | 14 +- .../log/remote/storage/RemoteLogManagerConfig.java | 558 .../storage/RemoteLogMetadataFormatterTest.java| 18 +- .../storage/RemoteLogMetadataManagerTestUtils.java | 23 +- .../storage/RemoteLogMetadataSerdeTest.java| 38 +- .../storage/RemoteLogMetadataTransformTest.java| 40 +- .../storage/RemoteLogSegmentLifecycleManager.java | 63 --- .../storage/RemoteLogSegmentLifecycleTest.java | 562 - ...picBasedRemoteLogMetadataManagerConfigTest.java | 53 +- .../TopicBasedRemoteLogMetadataManagerHarness.java | 95 ...ogMetadataManagerMultipleSubscriptionsTest.java | 41 +- ...icBasedRemoteLogMetadataManagerRestartTest.java | 181 +++ .../TopicBasedRemoteLogMetadataManagerTest.java| 56 +- ...RemoteLogMetadataManagerWrapperWithHarness.java | 105 .../remote/storage/RemoteLogManagerConfigTest.java | 107 ++-- .../storage/RemoteLogMetadataManagerTest.java | 83 +-- 35 files changed, 812 insertions(+), 1374 deletions(-) delete mode 100644 storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleManager.java delete mode 100644 storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java delete mode 100644 storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
(kafka) 05/12: KAFKA-16852 Adding two thread pools kafka-16852 (#16154)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit e4a3da6b09912e9f1d1fce61e7d77978e84e3c8d Author: Murali Basani AuthorDate: Mon Jun 3 10:52:54 2024 +0200 KAFKA-16852 Adding two thread pools kafka-16852 (#16154) Reviewers: Christo Lolov , Chia-Ping Tasi --- .../log/remote/storage/RemoteLogManagerConfig.java | 38 ++ .../remote/storage/RemoteLogManagerConfigTest.java | 6 +++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java index 6ea752886a9..d6cf615c781 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java @@ -100,6 +100,16 @@ public final class RemoteLogManagerConfig { "segments, fetch remote log indexes and clean up remote log segments."; public static final int DEFAULT_REMOTE_LOG_MANAGER_THREAD_POOL_SIZE = 10; +public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP = "remote.log.manager.copier.thread.pool.size"; +public static final String REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in " + +"scheduling tasks to copy segments."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE = 10; + +public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP = "remote.log.manager.expiration.thread.pool.size"; +public static final String REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC = "Size of the thread pool used in" + +" scheduling tasks to clean up remote log segments."; +public static final int DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE = 10; + public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP = "remote.log.manager.task.interval.ms"; public static final String REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_DOC = "Interval at which remote log manager runs the scheduled tasks like copy " + "segments, and clean up remote log segments."; @@ -241,6 +251,18 @@ public final class RemoteLogManagerConfig { atLeast(1), MEDIUM, REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_DOC) + .defineInternal(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP, +INT, +DEFAULT_REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE, +atLeast(1), +MEDIUM, +REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_DOC) + .defineInternal(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP, +INT, +DEFAULT_REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE, +atLeast(1), +MEDIUM, +REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_DOC) .define(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, LONG, DEFAULT_REMOTE_LOG_MANAGER_TASK_INTERVAL_MS, @@ -333,6 +355,8 @@ public final class RemoteLogManagerConfig { private final String remoteLogMetadataManagerClassPath; private final long remoteLogIndexFileCacheTotalSizeBytes; private final int remoteLogManagerThreadPoolSize; +private final int remoteLogManagerCopierThreadPoolSize; +private final int remoteLogManagerExpirationThreadPoolSize; private final long remoteLogManagerTaskIntervalMs; private final long remoteLogManagerTaskRetryBackoffMs; private final long remoteLogManagerTaskRetryBackoffMaxMs; @@ -361,6 +385,8 @@ public final class RemoteLogManagerConfig { config.getString(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP), config.getLong(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP), config.getInt(REMOTE_LOG_MANAGER_THREAD_POOL_SIZE_PROP), + config.getInt(REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP), + config.getInt(REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP), config.getLong(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP), config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MS_PROP), config.getLong(REMOTE_LOG_MANAGER_TASK_RETRY_BACK_OFF_MAX_MS_PROP), @@ -393,6 +419,8 @@ public final class RemoteLogManagerConfig { String remoteLogMetadataManagerListenerName, long remoteLogIndexFileCacheTotalSizeBytes,
(kafka) 10/12: KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 9460e6b266a2f2fd3c4569ad3ad461877fe4f3ca Author: Murali Basani AuthorDate: Thu Jun 6 18:06:25 2024 +0200 KAFKA-16884 Refactor RemoteLogManagerConfig with AbstractConfig (#16199) Reviewers: Greg Harris , Kamal Chandraprakash , Chia-Ping Tsai --- core/src/main/scala/kafka/server/KafkaConfig.scala | 4 +- .../kafka/log/remote/RemoteLogManagerTest.java | 4 +- .../unit/kafka/server/ReplicaManagerTest.scala | 8 +- .../log/remote/storage/RemoteLogManagerConfig.java | 245 - .../remote/storage/RemoteLogManagerConfigTest.java | 190 +--- 5 files changed, 97 insertions(+), 354 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3635d5a3edd..06bce64f623 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -455,7 +455,7 @@ object KafkaConfig { } /** * Remote Log Management Configuration */ - RemoteLogManagerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key)) + RemoteLogManagerConfig.configDef().configKeys().values().forEach(key => configDef.define(key)) def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala @@ -590,7 +590,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(props) def remoteLogManagerConfig = _remoteLogManagerConfig private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = { diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java index 50b581fdf4e..9badaabf419 100644 --- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java +++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; -import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.errors.ReplicaNotAvailableException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; @@ -2742,8 +2741,7 @@ public class RemoteLogManagerTest { props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal); props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal); -AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props); -return new RemoteLogManagerConfig(config); +return new RemoteLogManagerConfig(props); } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 65cbcc7bd70..313b679c605 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,7 @@ import kafka.zk.KafkaZkClient import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.{DirectoryId, IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.{AbstractConfig, TopicConfig} +import org.apache.kafka.common.config.{TopicConfig} import org.apache.kafka.common.errors.{InvalidPidMappingException, KafkaStorageException} import org.apache.kafka.common.message.LeaderAndIsrRequestData import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState @@ -4092,8 +4092,7 @@ class ReplicaManagerTest { props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName) // set log reader threads number to 2 props.put(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP, 2.toString) -val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props) -val remoteLogManagerConfig = new RemoteLogManagerConfig(config) +val remoteLogManagerConfig = new RemoteLogManagerConfig(props) val mockLog = mock(classOf[UnifiedLog]) val brokerTopicSta
(kafka) 08/12: KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (#14778)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit b6848d699da3299b55273b707124f2dbfa552743 Author: Kamal Chandraprakash AuthorDate: Wed Jun 5 12:12:23 2024 +0530 KAFKA-15776: Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout (#14778) KIP-1018, part1, Introduce remote.fetch.max.timeout.ms to configure DelayedRemoteFetch timeout Reviewers: Luke Chen --- .../kafka/clients/consumer/ConsumerConfig.java | 5 +- .../scala/kafka/server/DelayedRemoteFetch.scala| 3 +- .../main/scala/kafka/server/ReplicaManager.scala | 3 +- .../kafka/server/DelayedRemoteFetchTest.scala | 21 +- .../log/remote/storage/RemoteLogManagerConfig.java | 342 +++-- .../remote/storage/RemoteLogManagerConfigTest.java | 6 +- 6 files changed, 203 insertions(+), 177 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 7ec147e9e3c..76bfe7e91a1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -198,7 +198,10 @@ public class ConsumerConfig extends AbstractConfig { * fetch.max.wait.ms */ public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms"; -private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes."; +private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before " + +"answering the fetch request there isn't sufficient data to immediately satisfy the requirement given by " + +"fetch.min.bytes. This config is used only for local log fetch. To tune the remote fetch maximum wait " + +"time, please refer to 'remote.fetch.max.wait.ms' broker config"; public static final int DEFAULT_FETCH_MAX_WAIT_MS = 500; /** metadata.max.age.ms */ diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala index 00d6afb89ff..58a866aa4a6 100644 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala @@ -35,12 +35,13 @@ import scala.collection._ class DelayedRemoteFetch(remoteFetchTask: Future[Void], remoteFetchResult: CompletableFuture[RemoteLogReadResult], remoteFetchInfo: RemoteStorageFetchInfo, + remoteFetchMaxWaitMs: Long, fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], fetchParams: FetchParams, localReadResults: Seq[(TopicIdPartition, LogReadResult)], replicaManager: ReplicaManager, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(fetchParams.maxWaitMs) { + extends DelayedOperation(remoteFetchMaxWaitMs) { if (fetchParams.isFromFollower) { throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 6908ec096d1..17e9a5c1b9e 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1479,7 +1479,8 @@ class ReplicaManager(val config: KafkaConfig, return Some(createLogReadResult(e)) } -val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, +val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs() +val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo, remoteFetchMaxWaitMs, fetchPartitionStatus, params, logReadResults, this, responseCallback) delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key)) diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala index c35385bcbc8..ea1ffaf0b11 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala @@ -40,6 +40,7 @@ class DelayedRemoteFetchTest { private val fetchOffset = 500L private val logStartOffset = 0L private val currentLeaderEpoch = Optional.of[Integer](10) + private val
(kafka) 11/12: KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled (#16256)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 781b93b00d99a63dee0ed6650a2d398f7791cea3 Author: Chia Chuan Yu AuthorDate: Mon Jun 10 02:14:15 2024 +0800 KAFKA-16885 Renamed the enableRemoteStorageSystem to isRemoteStorageSystemEnabled (#16256) Reviewers: Kamal Chandraprakash , Chia-Ping Tsai --- .../java/kafka/log/remote/RemoteLogManager.java| 2 +- .../kafka/server/builders/KafkaApisBuilder.java| 2 +- .../server/builders/ReplicaManagerBuilder.java | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 4 ++-- .../src/main/scala/kafka/server/BrokerServer.scala | 4 ++-- .../main/scala/kafka/server/ConfigHandler.scala| 2 +- .../server/ControllerConfigurationValidator.scala | 2 +- core/src/main/scala/kafka/server/KafkaServer.scala | 4 ++-- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 ++-- .../kafka/log/remote/RemoteLogManagerTest.java | 2 +- .../test/scala/unit/kafka/log/LogConfigTest.scala | 26 +++--- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 2 +- .../unit/kafka/server/ReplicaManagerTest.scala | 6 ++--- .../log/remote/storage/RemoteLogManagerConfig.java | 2 +- 14 files changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 5b0d91ff439..de5fbd85f16 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -378,7 +378,7 @@ public class RemoteLogManager implements Closeable { Map topicIds) { LOGGER.debug("Received leadership changes for leaders: {} and followers: {}", partitionsBecomeLeader, partitionsBecomeFollower); -if (this.rlmConfig.enableRemoteStorageSystem() && !isRemoteLogManagerConfigured()) { +if (this.rlmConfig.isRemoteStorageSystemEnabled() && !isRemoteLogManagerConfigured()) { throw new KafkaException("RemoteLogManager is not configured when remote storage system is enabled"); } diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 1d422461678..56607723604 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public class KafkaApisBuilder { if (metrics == null) throw new RuntimeException("You must set metrics"); if (quotas == null) throw new RuntimeException("You must set quotas"); if (fetchManager == null) throw new RuntimeException("You must set fetchManager"); -if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); +if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); if (apiVersionManager == null) throw new RuntimeException("You must set apiVersionManager"); return new KafkaApis(requestChannel, diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 5e8cf2dcdc6..7cac33200d2 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -185,7 +185,7 @@ public class ReplicaManagerBuilder { if (metadataCache == null) throw new RuntimeException("You must set metadataCache"); if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel"); if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager"); -if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().enableRemoteStorageSystem()); +if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled()); // Initialize metrics in the end just before passing it to ReplicaManager to ensure ReplicaManager closes the // metrics correctly. There might be a resource leak if it is initialized and an exception occurs between // its initialization and creation of ReplicaManager. diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 45d6ab7908d..8908aadf459 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -1562,7 +1562,7 @@ object LogManager { keepPartitionMetadataFile: Boolean): LogManager = { val defaultProps = confi
(kafka-site) 01/01: MINOR: Add Josep Prat Key
This is an automated email from the ASF dual-hosted git repository. jlprat pushed a commit to branch MINOR-add-josep-prat-key in repository https://gitbox.apache.org/repos/asf/kafka-site.git commit 055a9b80588ce4eb5ae46b460fb687bfefde6ac4 Author: Josep Prat AuthorDate: Tue Jun 11 10:13:00 2024 +0200 MINOR: Add Josep Prat Key --- KEYS | 34 ++ 1 file changed, 34 insertions(+) diff --git a/KEYS b/KEYS index 3961ea8e..60193501 100644 --- a/KEYS +++ b/KEYS @@ -2107,3 +2107,37 @@ p/ywjgUCZjOVcQIbDAAKCRA5q1Uxp/ywjqFhAQDX/VpF9bAnu2WGoN1pMkHHlacB 6H9uaArrrUb+g2SNjQD/RxfdZMYnLtDaEJJ5mgADzRvv1Wg3nRBL+xm7mJfswgI= =qi+y -END PGP PUBLIC KEY BLOCK- +pub rsa4096 2022-10-28 [SC] + CF9500821E9557AEB04E026C05EEA67F87749E61 +uid [ultimate] Josep Prat (Apache) +sig 305EEA67F87749E61 2022-10-28 [self-signature] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBGNbjlgBEADLrLoKbXJAe/NvQ47iHWRthis0qsz5ge3EXmheUrrIy5xkSuPM +AMa10viHMTSPFV8XZhmfIMwlI6EH5eivsTqB1lp08twtFLymVu5ImOC192FyYX5k +ZWxBE1wbFByDPLAvzx3Dp7ACWF0BBA3JlGi1rKG/QFSghhtlq+I5e7oiTUPDXucV +6tsXu+inbyu0Il/1CsZD8v0iOchQoaL4pk7oY5jeYJnHT8Soxa+TpG4rh5b9vTWO +++zB+tDcSnqzvbvw6huHeUDozyfSbbYRcYl3vgmhdCj6tko8HNyj33UHj82KeuQ7 +evKE5GC4rm4jpNosRgdhWxwHPHEupslRxj4EW/fZf72O7VLeuf3srlPSh8Q9AWez +7Eg4QVhykuM8N/FXyYS8BUaGaInaV1C8l5UEeRVUvd3X74rdnTf3qlPAvXZ8AY96 +NsyMuXs9Edpunr5k38RToi6jl7hR6RTWTUqjYf+kJAGcqP9GDbuM09WE7QzawqPh +g/gb3tohPLrC77eFQItoODpLLtQ2jfdrsg3o+KeFiWSPh+Wfn0fPWpWdq9/qJ87B +WS8f/qE36RaAJjwRKtvZBsraUaTTfObdyOK+PcCgicAIZk7IH+T72guF7rZUFvIz +HaboINtkBqt0NRCMMc1QrVOqmptvP6xhtEqLx+HP3bmCqfpXXj0QdsJjmQARAQAB +tCdKb3NlcCBQcmF0IChBcGFjaGUpIDxqbHByYXRAYXBhY2hlLm9yZz6JAkYEEwEK +ADAWIQTPlQCCHpVXrrBOAmwF7qZ/h3SeYQUCY1uOWAIbAwILCQMVCggCFgACHgUC +F4AACgkQBe6mf4d0nmFJdA//fMqUIQXvVYxMpwEoZHfKvIgy+mrS2z1fXqsD795l +E0FDOk3OOhqh3bsb8/ietM0fdk8dewk0MC9+uDVb06zFKY3ex1ZhcNybmOuF75Ey +1AErWV78dKG/f8h8EnmMeweYLel6jL1Ch32hsQTci9RcL0FSsdxWfOjFNNSRrRpU +dXUJH8GXAP0BUGfS/AkfBCpWDWXVRdNb83q+DqXZd878O9ybaqPY1LzSiKM4TfST +4RptJVVrKPcPQLz9cT8aWW7AbF3HS5I3zu1bF62+Mk+mq3zL/vEWXbXfRibeLUB+ +VZFpKRSCkKSYCkOLCGFclPMoBmD3PVf7iqIJ3gmRTktrfve/SrCJmnfi3325ZQuF +i5wMh3jRRxchXEOwDX5oXOABW3MoZfU594rvdpWQikeXMsAS9O8bZ4HJsnIOaOJ2 +1TU5M0s0uuoRU0kd5lx5SI0WeTa2SrEwy35DNhA7m1dw/nA74VhITTbHXg7xg4Iz +bivsVVjT/CZPql7I78jL9KJq80KVdhOcDNPwgoCj4gNVVFllbuJAOcMEs7HVdV3Q +6DmT9N0HDfHkWt6dhpKmog14MeM0X5ZaALM9JE9xlyt4laBeRW1YXER/yLw3Yo6h +HLtR1XUniZRa2xjP82Xl6/ZfTeeYpB1Ewn7B8xDTfTnxQusYfX3Rr07lT145l/cM +q+k= +=LiMJ +-END PGP PUBLIC KEY BLOCK-
(kafka-site) branch MINOR-add-josep-prat-key created (now 055a9b80)
This is an automated email from the ASF dual-hosted git repository. jlprat pushed a change to branch MINOR-add-josep-prat-key in repository https://gitbox.apache.org/repos/asf/kafka-site.git at 055a9b80 MINOR: Add Josep Prat Key This branch includes the following new commits: new 055a9b80 MINOR: Add Josep Prat Key The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka-site) branch asf-site updated: MINOR: Add Josep Prat Key (#603)
This is an automated email from the ASF dual-hosted git repository. jlprat pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 9e243531 MINOR: Add Josep Prat Key (#603) 9e243531 is described below commit 9e243531dac508033d149e724a47afd1732629e9 Author: Josep Prat AuthorDate: Tue Jun 11 10:58:34 2024 +0200 MINOR: Add Josep Prat Key (#603) Reviewers: Igor Soarez --- KEYS | 34 ++ 1 file changed, 34 insertions(+) diff --git a/KEYS b/KEYS index 3961ea8e..60193501 100644 --- a/KEYS +++ b/KEYS @@ -2107,3 +2107,37 @@ p/ywjgUCZjOVcQIbDAAKCRA5q1Uxp/ywjqFhAQDX/VpF9bAnu2WGoN1pMkHHlacB 6H9uaArrrUb+g2SNjQD/RxfdZMYnLtDaEJJ5mgADzRvv1Wg3nRBL+xm7mJfswgI= =qi+y -END PGP PUBLIC KEY BLOCK- +pub rsa4096 2022-10-28 [SC] + CF9500821E9557AEB04E026C05EEA67F87749E61 +uid [ultimate] Josep Prat (Apache) +sig 305EEA67F87749E61 2022-10-28 [self-signature] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBGNbjlgBEADLrLoKbXJAe/NvQ47iHWRthis0qsz5ge3EXmheUrrIy5xkSuPM +AMa10viHMTSPFV8XZhmfIMwlI6EH5eivsTqB1lp08twtFLymVu5ImOC192FyYX5k +ZWxBE1wbFByDPLAvzx3Dp7ACWF0BBA3JlGi1rKG/QFSghhtlq+I5e7oiTUPDXucV +6tsXu+inbyu0Il/1CsZD8v0iOchQoaL4pk7oY5jeYJnHT8Soxa+TpG4rh5b9vTWO +++zB+tDcSnqzvbvw6huHeUDozyfSbbYRcYl3vgmhdCj6tko8HNyj33UHj82KeuQ7 +evKE5GC4rm4jpNosRgdhWxwHPHEupslRxj4EW/fZf72O7VLeuf3srlPSh8Q9AWez +7Eg4QVhykuM8N/FXyYS8BUaGaInaV1C8l5UEeRVUvd3X74rdnTf3qlPAvXZ8AY96 +NsyMuXs9Edpunr5k38RToi6jl7hR6RTWTUqjYf+kJAGcqP9GDbuM09WE7QzawqPh +g/gb3tohPLrC77eFQItoODpLLtQ2jfdrsg3o+KeFiWSPh+Wfn0fPWpWdq9/qJ87B +WS8f/qE36RaAJjwRKtvZBsraUaTTfObdyOK+PcCgicAIZk7IH+T72guF7rZUFvIz +HaboINtkBqt0NRCMMc1QrVOqmptvP6xhtEqLx+HP3bmCqfpXXj0QdsJjmQARAQAB +tCdKb3NlcCBQcmF0IChBcGFjaGUpIDxqbHByYXRAYXBhY2hlLm9yZz6JAkYEEwEK +ADAWIQTPlQCCHpVXrrBOAmwF7qZ/h3SeYQUCY1uOWAIbAwILCQMVCggCFgACHgUC +F4AACgkQBe6mf4d0nmFJdA//fMqUIQXvVYxMpwEoZHfKvIgy+mrS2z1fXqsD795l +E0FDOk3OOhqh3bsb8/ietM0fdk8dewk0MC9+uDVb06zFKY3ex1ZhcNybmOuF75Ey +1AErWV78dKG/f8h8EnmMeweYLel6jL1Ch32hsQTci9RcL0FSsdxWfOjFNNSRrRpU +dXUJH8GXAP0BUGfS/AkfBCpWDWXVRdNb83q+DqXZd878O9ybaqPY1LzSiKM4TfST +4RptJVVrKPcPQLz9cT8aWW7AbF3HS5I3zu1bF62+Mk+mq3zL/vEWXbXfRibeLUB+ +VZFpKRSCkKSYCkOLCGFclPMoBmD3PVf7iqIJ3gmRTktrfve/SrCJmnfi3325ZQuF +i5wMh3jRRxchXEOwDX5oXOABW3MoZfU594rvdpWQikeXMsAS9O8bZ4HJsnIOaOJ2 +1TU5M0s0uuoRU0kd5lx5SI0WeTa2SrEwy35DNhA7m1dw/nA74VhITTbHXg7xg4Iz +bivsVVjT/CZPql7I78jL9KJq80KVdhOcDNPwgoCj4gNVVFllbuJAOcMEs7HVdV3Q +6DmT9N0HDfHkWt6dhpKmog14MeM0X5ZaALM9JE9xlyt4laBeRW1YXER/yLw3Yo6h +HLtR1XUniZRa2xjP82Xl6/ZfTeeYpB1Ewn7B8xDTfTnxQusYfX3Rr07lT145l/cM +q+k= +=LiMJ +-END PGP PUBLIC KEY BLOCK-
(kafka-site) branch MINOR-add-josep-prat-key deleted (was 055a9b80)
This is an automated email from the ASF dual-hosted git repository. jlprat pushed a change to branch MINOR-add-josep-prat-key in repository https://gitbox.apache.org/repos/asf/kafka-site.git was 055a9b80 MINOR: Add Josep Prat Key The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
svn commit: r69659 - /release/kafka/KEYS
Author: mimaison Date: Tue Jun 11 09:10:53 2024 New Revision: 69659 Log: Add Josep Prat's key Modified: release/kafka/KEYS Modified: release/kafka/KEYS == --- release/kafka/KEYS (original) +++ release/kafka/KEYS Tue Jun 11 09:10:53 2024 @@ -2107,3 +2107,37 @@ p/ywjgUCZjOVcQIbDAAKCRA5q1Uxp/ywjqFhAQDX 6H9uaArrrUb+g2SNjQD/RxfdZMYnLtDaEJJ5mgADzRvv1Wg3nRBL+xm7mJfswgI= =qi+y -END PGP PUBLIC KEY BLOCK- +pub rsa4096 2022-10-28 [SC] + CF9500821E9557AEB04E026C05EEA67F87749E61 +uid [ultimate] Josep Prat (Apache) +sig 305EEA67F87749E61 2022-10-28 [self-signature] + +-BEGIN PGP PUBLIC KEY BLOCK- + +mQINBGNbjlgBEADLrLoKbXJAe/NvQ47iHWRthis0qsz5ge3EXmheUrrIy5xkSuPM +AMa10viHMTSPFV8XZhmfIMwlI6EH5eivsTqB1lp08twtFLymVu5ImOC192FyYX5k +ZWxBE1wbFByDPLAvzx3Dp7ACWF0BBA3JlGi1rKG/QFSghhtlq+I5e7oiTUPDXucV +6tsXu+inbyu0Il/1CsZD8v0iOchQoaL4pk7oY5jeYJnHT8Soxa+TpG4rh5b9vTWO +++zB+tDcSnqzvbvw6huHeUDozyfSbbYRcYl3vgmhdCj6tko8HNyj33UHj82KeuQ7 +evKE5GC4rm4jpNosRgdhWxwHPHEupslRxj4EW/fZf72O7VLeuf3srlPSh8Q9AWez +7Eg4QVhykuM8N/FXyYS8BUaGaInaV1C8l5UEeRVUvd3X74rdnTf3qlPAvXZ8AY96 +NsyMuXs9Edpunr5k38RToi6jl7hR6RTWTUqjYf+kJAGcqP9GDbuM09WE7QzawqPh +g/gb3tohPLrC77eFQItoODpLLtQ2jfdrsg3o+KeFiWSPh+Wfn0fPWpWdq9/qJ87B +WS8f/qE36RaAJjwRKtvZBsraUaTTfObdyOK+PcCgicAIZk7IH+T72guF7rZUFvIz +HaboINtkBqt0NRCMMc1QrVOqmptvP6xhtEqLx+HP3bmCqfpXXj0QdsJjmQARAQAB +tCdKb3NlcCBQcmF0IChBcGFjaGUpIDxqbHByYXRAYXBhY2hlLm9yZz6JAkYEEwEK +ADAWIQTPlQCCHpVXrrBOAmwF7qZ/h3SeYQUCY1uOWAIbAwILCQMVCggCFgACHgUC +F4AACgkQBe6mf4d0nmFJdA//fMqUIQXvVYxMpwEoZHfKvIgy+mrS2z1fXqsD795l +E0FDOk3OOhqh3bsb8/ietM0fdk8dewk0MC9+uDVb06zFKY3ex1ZhcNybmOuF75Ey +1AErWV78dKG/f8h8EnmMeweYLel6jL1Ch32hsQTci9RcL0FSsdxWfOjFNNSRrRpU +dXUJH8GXAP0BUGfS/AkfBCpWDWXVRdNb83q+DqXZd878O9ybaqPY1LzSiKM4TfST +4RptJVVrKPcPQLz9cT8aWW7AbF3HS5I3zu1bF62+Mk+mq3zL/vEWXbXfRibeLUB+ +VZFpKRSCkKSYCkOLCGFclPMoBmD3PVf7iqIJ3gmRTktrfve/SrCJmnfi3325ZQuF +i5wMh3jRRxchXEOwDX5oXOABW3MoZfU594rvdpWQikeXMsAS9O8bZ4HJsnIOaOJ2 +1TU5M0s0uuoRU0kd5lx5SI0WeTa2SrEwy35DNhA7m1dw/nA74VhITTbHXg7xg4Iz +bivsVVjT/CZPql7I78jL9KJq80KVdhOcDNPwgoCj4gNVVFllbuJAOcMEs7HVdV3Q +6DmT9N0HDfHkWt6dhpKmog14MeM0X5ZaALM9JE9xlyt4laBeRW1YXER/yLw3Yo6h +HLtR1XUniZRa2xjP82Xl6/ZfTeeYpB1Ewn7B8xDTfTnxQusYfX3Rr07lT145l/cM +q+k= +=LiMJ +-END PGP PUBLIC KEY BLOCK-
(kafka) branch trunk updated (af86e56fcdf -> 99eacf1b61a)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from af86e56fcdf KAFKA-10787 Apply spotless to tools and tools-api module (#16262) add 99eacf1b61a KAFKA-16914: Added share group dynamic and broker configs (#16268) No new revisions were added by this update. Summary of changes: core/src/main/scala/kafka/server/KafkaConfig.scala | 65 +- .../scala/unit/kafka/server/KafkaConfigTest.scala | 75 - .../kafka/server/config/ShareGroupConfigs.java | 78 ++ 3 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/apache/kafka/server/config/ShareGroupConfigs.java
(kafka) branch trunk updated: KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f746d67c3bf KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267) f746d67c3bf is described below commit f746d67c3bf62e7d4f5f4e652ea36b4d5a0e01a6 Author: KrishVora01 <156789009+krishvor...@users.noreply.github.com> AuthorDate: Tue Jun 11 16:20:47 2024 +0530 KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267) This PR aims to add the static Dockerfile and scripts for AK 3.7.0 version. As mentioned in KIP-1028 this PR aims to start the release of the kafka:3.7.0 Docker Official image. This will also help us validate the process and allow us to address any changes suggested by Dockerhub before the 3.8.0 release. The static Dockerfile and scripts have been generated via the github actions workflows and scripts added as part of https://github.com/apache/kafka/pull/16027. The reports of build and testing the 3.7.0 Docker official image are below. Reviewers: Manikumar Reddy , Vedarth Sharma --- docker/docker_official_images/3.7.0/jvm/Dockerfile | 95 docker/docker_official_images/3.7.0/jvm/jsa_launch | 49 + docker/docker_official_images/3.7.0/jvm/launch | 68 .../3.7.0/jvm/resources/common-scripts/bash-config | 23 .../3.7.0/jvm/resources/common-scripts/configure | 121 + .../jvm/resources/common-scripts/configureDefaults | 28 + .../3.7.0/jvm/resources/common-scripts/run | 38 +++ 7 files changed, 422 insertions(+) diff --git a/docker/docker_official_images/3.7.0/jvm/Dockerfile b/docker/docker_official_images/3.7.0/jvm/Dockerfile new file mode 100755 index 000..7aa054a4f7b --- /dev/null +++ b/docker/docker_official_images/3.7.0/jvm/Dockerfile @@ -0,0 +1,95 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +FROM eclipse-temurin:21-jre-alpine AS build-jsa + +USER root + +# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version 3.7.0 +ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz + +COPY jsa_launch /etc/kafka/docker/jsa_launch + +RUN set -eux ; \ +apk update ; \ +apk upgrade ; \ +apk add --no-cache wget gcompat gpg gpg-agent procps bash; \ +mkdir opt/kafka; \ +wget -nv -O kafka.tgz "$kafka_url"; \ +wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \ +tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \ +wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \ +gpg --import KEYS; \ +gpg --batch --verify kafka.tgz.asc kafka.tgz + +# Generate jsa files using dynamic CDS for kafka server start command and kafka storage format command +RUN /etc/kafka/docker/jsa_launch + + +FROM eclipse-temurin:21-jre-alpine + +# exposed ports +EXPOSE 9092 + +USER root + +# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version 3.7.0 +ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz +ENV build_date 2024-06-10 + + +LABEL org.label-schema.name="kafka" \ + org.label-schema.description="Apache Kafka" \ + org.label-schema.build-date="${build_date}" \ + org.label-schema.vcs-url="https://github.com/apache/kafka"; \ + maintainer="Apache Kafka" + +RUN set -eux ; \ +apk update ; \ +apk upgrade ; \ +apk add --no-cache wget gcompat gpg gpg-agent procps bash; \ +mkdir opt/kafka; \ +wget -nv -O kafka.tgz "$kafka_url"; \ +wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \ +tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \ +wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \ +gpg --import KEYS; \ +gpg --batch --verify kafka.tgz.asc kafka.tgz; \ +mkdir -p /var/lib/kafka/data /etc/kafka/secrets; \ +mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \ +addus
(kafka) branch trunk updated (f746d67c3bf -> b7dcae44ffb)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from f746d67c3bf KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267) add b7dcae44ffb KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281) No new revisions were added by this update. Summary of changes: docker/docker_official_image_build_test.py | 2 +- docker/docker_official_images/3.7.0/jvm/Dockerfile | 10 +- docker/prepare_docker_official_image_source.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-)
(kafka) branch 3.8 updated (d94a28b4a4f -> 15db8233179)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a change to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git from d94a28b4a4f KAFKA-15776: Support added to update remote.fetch.max.wait.ms dynamically (#16203) new e75cc45bdf0 KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267) new 15db8233179 KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: docker/docker_official_image_build_test.py | 2 +- docker/{ => docker_official_images/3.7.0}/jvm/Dockerfile | 10 +- docker/{ => docker_official_images/3.7.0}/jvm/jsa_launch | 0 docker/{ => docker_official_images/3.7.0}/jvm/launch | 0 .../3.7.0/jvm}/resources/common-scripts/bash-config| 0 .../3.7.0/jvm}/resources/common-scripts/configure | 0 .../3.7.0/jvm}/resources/common-scripts/configureDefaults | 0 .../3.7.0/jvm}/resources/common-scripts/run| 0 docker/prepare_docker_official_image_source.py | 4 ++-- 9 files changed, 8 insertions(+), 8 deletions(-) copy docker/{ => docker_official_images/3.7.0}/jvm/Dockerfile (90%) mode change 100644 => 100755 copy docker/{ => docker_official_images/3.7.0}/jvm/jsa_launch (100%) copy docker/{ => docker_official_images/3.7.0}/jvm/launch (100%) copy docker/{ => docker_official_images/3.7.0/jvm}/resources/common-scripts/bash-config (100%) mode change 100644 => 100755 copy docker/{ => docker_official_images/3.7.0/jvm}/resources/common-scripts/configure (100%) copy docker/{ => docker_official_images/3.7.0/jvm}/resources/common-scripts/configureDefaults (100%) copy docker/{ => docker_official_images/3.7.0/jvm}/resources/common-scripts/run (100%)
(kafka) 02/02: KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 15db8233179f0889def4d4f60df199fbde4dd88f Author: KrishVora01 <156789009+krishvor...@users.noreply.github.com> AuthorDate: Tue Jun 11 21:02:33 2024 +0530 KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281) This PR modifies the download url from https://downloads.apache.org/kafka/ to https://archive.apache.org/dist/kafka/ as the former is not permanent. Reviewers: Manikumar Reddy , Vedarth Sharma --- docker/docker_official_image_build_test.py | 2 +- docker/docker_official_images/3.7.0/jvm/Dockerfile | 10 +- docker/prepare_docker_official_image_source.py | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docker/docker_official_image_build_test.py b/docker/docker_official_image_build_test.py index 6ffe25ee0b8..3da68854c23 100644 --- a/docker/docker_official_image_build_test.py +++ b/docker/docker_official_image_build_test.py @@ -76,7 +76,7 @@ if __name__ == '__main__': parser.add_argument("--test", "-t", action="store_true", dest="test_only", default=False, help="Only run the tests, don't build the image") args = parser.parse_args() -kafka_url = f"https://downloads.apache.org/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz"; +kafka_url = f"https://archive.apache.org/dist/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz"; if args.build_only or not (args.build_only or args.test_only): if args.kafka_version: build_docker_official_image(args.image, args.tag, args.kafka_version, args.image_type) diff --git a/docker/docker_official_images/3.7.0/jvm/Dockerfile b/docker/docker_official_images/3.7.0/jvm/Dockerfile index 7aa054a4f7b..905e2f2149b 100755 --- a/docker/docker_official_images/3.7.0/jvm/Dockerfile +++ b/docker/docker_official_images/3.7.0/jvm/Dockerfile @@ -20,8 +20,8 @@ FROM eclipse-temurin:21-jre-alpine AS build-jsa USER root -# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version 3.7.0 -ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz +# Get Kafka from https://archive.apache.org/dist/kafka, url passed as env var, for version 3.7.0 +ENV kafka_url https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz COPY jsa_launch /etc/kafka/docker/jsa_launch @@ -48,9 +48,9 @@ EXPOSE 9092 USER root -# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version 3.7.0 -ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz -ENV build_date 2024-06-10 +# Get Kafka from https://archive.apache.org/dist/kafka, url passed as env var, for version 3.7.0 +ENV kafka_url https://archive.apache.org/dist/kafka/3.7.0/kafka_2.13-3.7.0.tgz +ENV build_date 2024-06-11 LABEL org.label-schema.name="kafka" \ diff --git a/docker/prepare_docker_official_image_source.py b/docker/prepare_docker_official_image_source.py index 10b4f04d78f..25d57c53e0f 100644 --- a/docker/prepare_docker_official_image_source.py +++ b/docker/prepare_docker_official_image_source.py @@ -46,7 +46,7 @@ def remove_args_and_hardcode_values(file_path, kafka_version, kafka_url): filedata = filedata.replace( "ARG build_date", f"ENV build_date {str(date.today())}") original_comment = re.compile(r"# Get kafka from https://archive.apache.org/dist/kafka and pass the url through build arguments") -updated_comment = f"# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version {kafka_version}" +updated_comment = f"# Get Kafka from https://archive.apache.org/dist/kafka, url passed as env var, for version {kafka_version}" filedata = original_comment.sub(updated_comment, filedata) with open(file_path, 'w') as file: file.write(filedata) @@ -59,7 +59,7 @@ if __name__ == '__main__': parser.add_argument("--kafka-version", "-v", dest="kafka_version", help="Kafka version for which the source for docker official image is to be built") args = parser.parse_args() -kafka_url = f"https://downloads.apache.org/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz"; +kafka_url = f"https://archive.apache.org/dist/kafka/{args.kafka_version}/kafka_2.13-{args.kafka_version}.tgz"; current_dir = os.path.dirname(os.path.realpath(__file__)) new_dir = os.path.join( current_dir, f'docker_official_images', args.kafka_version)
(kafka) 01/02: KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267)
This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git commit e75cc45bdf0dcd385e011e4ff7c5e8ce62fb3abf Author: KrishVora01 <156789009+krishvor...@users.noreply.github.com> AuthorDate: Tue Jun 11 16:20:47 2024 +0530 KAFKA-16373: KIP-1028: Adding 3.7.0 docker official images static assets (#16267) This PR aims to add the static Dockerfile and scripts for AK 3.7.0 version. As mentioned in KIP-1028 this PR aims to start the release of the kafka:3.7.0 Docker Official image. This will also help us validate the process and allow us to address any changes suggested by Dockerhub before the 3.8.0 release. The static Dockerfile and scripts have been generated via the github actions workflows and scripts added as part of https://github.com/apache/kafka/pull/16027. The reports of build and testing the 3.7.0 Docker official image are below. Reviewers: Manikumar Reddy , Vedarth Sharma --- docker/docker_official_images/3.7.0/jvm/Dockerfile | 95 docker/docker_official_images/3.7.0/jvm/jsa_launch | 49 + docker/docker_official_images/3.7.0/jvm/launch | 68 .../3.7.0/jvm/resources/common-scripts/bash-config | 23 .../3.7.0/jvm/resources/common-scripts/configure | 121 + .../jvm/resources/common-scripts/configureDefaults | 28 + .../3.7.0/jvm/resources/common-scripts/run | 38 +++ 7 files changed, 422 insertions(+) diff --git a/docker/docker_official_images/3.7.0/jvm/Dockerfile b/docker/docker_official_images/3.7.0/jvm/Dockerfile new file mode 100755 index 000..7aa054a4f7b --- /dev/null +++ b/docker/docker_official_images/3.7.0/jvm/Dockerfile @@ -0,0 +1,95 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### + +FROM eclipse-temurin:21-jre-alpine AS build-jsa + +USER root + +# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version 3.7.0 +ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz + +COPY jsa_launch /etc/kafka/docker/jsa_launch + +RUN set -eux ; \ +apk update ; \ +apk upgrade ; \ +apk add --no-cache wget gcompat gpg gpg-agent procps bash; \ +mkdir opt/kafka; \ +wget -nv -O kafka.tgz "$kafka_url"; \ +wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \ +tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \ +wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \ +gpg --import KEYS; \ +gpg --batch --verify kafka.tgz.asc kafka.tgz + +# Generate jsa files using dynamic CDS for kafka server start command and kafka storage format command +RUN /etc/kafka/docker/jsa_launch + + +FROM eclipse-temurin:21-jre-alpine + +# exposed ports +EXPOSE 9092 + +USER root + +# Get Kafka from https://downloads.apache.org/kafka, url passed as env var, for version 3.7.0 +ENV kafka_url https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz +ENV build_date 2024-06-10 + + +LABEL org.label-schema.name="kafka" \ + org.label-schema.description="Apache Kafka" \ + org.label-schema.build-date="${build_date}" \ + org.label-schema.vcs-url="https://github.com/apache/kafka"; \ + maintainer="Apache Kafka" + +RUN set -eux ; \ +apk update ; \ +apk upgrade ; \ +apk add --no-cache wget gcompat gpg gpg-agent procps bash; \ +mkdir opt/kafka; \ +wget -nv -O kafka.tgz "$kafka_url"; \ +wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \ +tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \ +wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \ +gpg --import KEYS; \ +gpg --batch --verify kafka.tgz.asc kafka.tgz; \ +mkdir -p /var/lib/kafka/data /etc/kafka/secrets; \ +mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \ +adduser -h /home/appuser -D --shell /bin/bash appuser; \ +chown appuser:appuser -R /usr/logs /opt/kafka /mnt/shared/config; \ +chown appuser:root -R /var/lib/kafka /etc/kafka/secrets /etc/kafka; \ +chm
(kafka) branch trunk updated (b7dcae44ffb -> f3dbd7ed08a)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from b7dcae44ffb KAFKA-16373: KIP-1028: Modfiying download url for kafka dockerfile (#16281) add f3dbd7ed08a KAFKA-16904: Metric to measure the latency of remote read requests (#16209) No new revisions were added by this update. Summary of changes: core/src/main/java/kafka/log/remote/RemoteLogManager.java | 13 ++--- core/src/main/java/kafka/log/remote/RemoteLogReader.java| 11 ++- .../test/java/kafka/log/remote/RemoteLogManagerTest.java| 9 +++-- .../src/test/java/kafka/log/remote/RemoteLogReaderTest.java | 12 +--- .../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 10 +- .../org/apache/kafka/server/metrics/KafkaMetricsGroup.java | 12 .../server/log/remote/storage/RemoteStorageMetrics.java | 3 +++ 7 files changed, 56 insertions(+), 14 deletions(-)
(kafka) branch 3.8 updated: KAFKA-16904: Metric to measure the latency of remote read requests (#16209)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new bcd95f6485c KAFKA-16904: Metric to measure the latency of remote read requests (#16209) bcd95f6485c is described below commit bcd95f6485cd94fc489b0c1eba976ecda517d086 Author: Kamal Chandraprakash AuthorDate: Tue Jun 11 21:07:12 2024 +0530 KAFKA-16904: Metric to measure the latency of remote read requests (#16209) Reviewers: Satish Duggana , Christo Lolov , Luke Chen --- core/src/main/java/kafka/log/remote/RemoteLogManager.java | 13 ++--- core/src/main/java/kafka/log/remote/RemoteLogReader.java| 11 ++- .../test/java/kafka/log/remote/RemoteLogManagerTest.java| 9 +++-- .../src/test/java/kafka/log/remote/RemoteLogReaderTest.java | 12 +--- .../test/scala/unit/kafka/server/ReplicaManagerTest.scala | 10 +- .../org/apache/kafka/server/metrics/KafkaMetricsGroup.java | 12 .../server/log/remote/storage/RemoteStorageMetrics.java | 3 +++ 7 files changed, 56 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index de5fbd85f16..137985d00b5 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -17,6 +17,7 @@ package kafka.log.remote; import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.Timer; import kafka.cluster.EndPoint; import kafka.cluster.Partition; import kafka.log.UnifiedLog; @@ -126,6 +127,7 @@ import java.util.stream.Stream; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX; import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC; +import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC; import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG; /** @@ -174,6 +176,7 @@ public class RemoteLogManager implements Closeable { private boolean closed = false; private volatile boolean remoteLogManagerConfigured = false; +private final Timer remoteReadTimer; /** * Creates RemoteLogManager instance with the given arguments. @@ -216,12 +219,14 @@ public class RemoteLogManager implements Closeable { delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs(); rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize()); - metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName(), new Gauge() { + metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC, new Gauge() { @Override public Double value() { return rlmScheduledThreadPool.getIdlePercent(); } }); +remoteReadTimer = metricsGroup.newTimer(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC, +TimeUnit.MILLISECONDS, TimeUnit.SECONDS); remoteStorageReaderThreadPool = new RemoteStorageThreadPool( REMOTE_LOG_READER_THREAD_NAME_PREFIX, @@ -235,7 +240,8 @@ public class RemoteLogManager implements Closeable { } private void removeMetrics() { - metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC.getName()); + metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC); + metricsGroup.removeMetric(REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC); remoteStorageReaderThreadPool.removeMetrics(); } @@ -1664,7 +1670,8 @@ public class RemoteLogManager implements Closeable { * @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full) */ public Future asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer callback) { -return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager)); +return remoteStorageReaderThreadPool.submit( +new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats, rlmFetchQuotaManager, remoteReadTimer)); } void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition, diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/core/src/main/java/kafka/log/remote/RemoteLogReader.java index 9395cbd60ed..c28677459ef 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java @@ -16,6 +16,7 @@ */ package kafka.log.remote; +import com.y
(kafka) branch trunk updated: KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106)
This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new aecaf444756 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106) aecaf444756 is described below commit aecaf4447561edd8da9f06e3abdf46f382dc9d89 Author: Nikolay AuthorDate: Tue Jun 11 20:01:35 2024 +0300 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106) Add support for KIP-953 KRaft Quorum reconfiguration in the DescribeQuorum request and response. Also add support to AdminClient.describeQuorum, so that users will be able to find the current set of quorum nodes, as well as their directories, via these RPCs. Reviewers: Luke Chen , Colin P. McCabe , Andrew Schofield --- .../kafka/clients/admin/KafkaAdminClient.java | 20 +- .../org/apache/kafka/clients/admin/QuorumInfo.java | 76 ++- .../common/requests/DescribeQuorumRequest.java | 4 +- .../common/requests/DescribeQuorumResponse.java| 13 +- .../common/message/DescribeQuorumRequest.json | 4 +- .../common/message/DescribeQuorumResponse.json | 24 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 23 +- .../unit/kafka/server/ApiVersionsRequestTest.scala | 4 +- .../kafka/server/DescribeQuorumRequestTest.scala | 2 + .../scala/unit/kafka/server/KafkaApisTest.scala| 3 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 10 +- .../apache/kafka/raft/KafkaRaftClientDriver.java | 1 + .../java/org/apache/kafka/raft/LeaderState.java| 84 +-- .../java/org/apache/kafka/raft/QuorumState.java| 3 +- .../java/org/apache/kafka/raft/RaftMessage.java| 3 +- .../java/org/apache/kafka/raft/RaftRequest.java| 17 +- .../java/org/apache/kafka/raft/RaftResponse.java | 5 + .../kafka/raft/internals/BlockingMessageQueue.java | 5 + .../org/apache/kafka/raft/internals/VoterSet.java | 6 +- .../apache/kafka/raft/KafkaNetworkChannelTest.java | 1 + .../org/apache/kafka/raft/KafkaRaftClientTest.java | 7 + .../org/apache/kafka/raft/LeaderStateTest.java | 243 - .../apache/kafka/raft/RaftClientTestContext.java | 19 +- .../apache/kafka/raft/RaftEventSimulationTest.java | 2 +- .../kafka/raft/internals/KafkaRaftMetricsTest.java | 4 +- 25 files changed, 418 insertions(+), 165 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index c59cccf67c4..d7d525e4431 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4414,12 +4414,13 @@ public class KafkaAdminClient extends AdminClient { private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData.ReplicaState replica) { return new QuorumInfo.ReplicaState( replica.replicaId(), +replica.replicaDirectoryId() == null ? Uuid.ZERO_UUID : replica.replicaDirectoryId(), replica.logEndOffset(), replica.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastFetchTimestamp()), replica.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(replica.lastCaughtUpTimestamp())); } -private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition, DescribeQuorumResponseData.NodeCollection nodeCollection) { List voters = partition.currentVoters().stream() .map(this::translateReplicaState) .collect(Collectors.toList()); @@ -4428,12 +4429,21 @@ public class KafkaAdminClient extends AdminClient { .map(this::translateReplicaState) .collect(Collectors.toList()); +Map nodes = nodeCollection.stream().map(n -> { +List endpoints = n.listeners().stream() +.map(l -> new RaftVoterEndpoint(l.name(), l.host(), l.port())) +.collect(Collectors.toList()); + +return new QuorumInfo.Node(n.nodeId(), endpoints); +}).collect(Collectors.toMap(QuorumInfo.Node::nodeId, Function.identity())); + return new QuorumInfo( partition.leaderId(), partition.leaderEpoch(), partition.highWatermark(), voters, -observers +observers, +nodes ); }
(kafka) branch trunk updated (aecaf444756 -> 98f7da9172c)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from aecaf444756 KAFKA-16520: Support KIP-853 in DescribeQuorum (#16106) add 98f7da9172c KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one member has no subscriptions (#16283) No new revisions were added by this update. Summary of changes: .../UniformHeterogeneousAssignmentBuilder.java | 8 ++-- .../UniformHeterogeneousAssignmentBuilderTest.java | 45 ++ 2 files changed, 49 insertions(+), 4 deletions(-)
(kafka) branch 3.8 updated: KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one member has no subscriptions (#16283)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 46d7e44d1b6 KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one member has no subscriptions (#16283) 46d7e44d1b6 is described below commit 46d7e44d1b6ed7807e5ec692f397a3d4118155b5 Author: David Jacot AuthorDate: Tue Jun 11 20:43:56 2024 +0200 KAFKA-16930; UniformHeterogeneousAssignmentBuilder throws NPE when one member has no subscriptions (#16283) Fix the following NPE: ``` java.lang.NullPointerException: Cannot invoke "org.apache.kafka.coordinator.group.assignor.MemberAssignment.targetPartitions()" because the return value of "java.util.Map.get(Object)" is null at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.canMemberParticipateInReassignment(GeneralUniformAssignmentBuilder.java:248) at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.balance(GeneralUniformAssignmentBuilder.java:336) at org.apache.kafka.coordinator.group.assignor.GeneralUniformAssignmentBuilder.buildAssignment(GeneralUniformAssignmentBuilder.java:157) at org.apache.kafka.coordinator.group.assignor.UniformAssignor.assign(UniformAssignor.java:84) at org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:302) at org.apache.kafka.coordinator.group.GroupMetadataManager.updateTargetAssignment(GroupMetadataManager.java:1913) at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:1518) at org.apache.kafka.coordinator.group.GroupMetadataManager.consumerGroupHeartbeat(GroupMetadataManager.java:2254) at org.apache.kafka.coordinator.group.GroupCoordinatorShard.consumerGroupHeartbeat(GroupCoordinatorShard.java:308) at org.apache.kafka.coordinator.group.GroupCoordinatorService.lambda$consumerGroupHeartbeat$0(GroupCoordinatorService.java:298) at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.lambda$run$0(CoordinatorRuntime.java:769) at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.withActiveContextOrThrow(CoordinatorRuntime.java:1582) at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.access$1400(CoordinatorRuntime.java:96) at org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime$CoordinatorWriteEvent.run(CoordinatorRuntime.java:767) at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.handleEvents(MultiThreadedEventProcessor.java:144) at org.apache.kafka.coordinator.group.runtime.MultiThreadedEventProcessor$EventProcessorThread.run(MultiThreadedEventProcessor.java:176) ``` Reviewers: Lianet Magrans , Justine Olshan --- .../UniformHeterogeneousAssignmentBuilder.java | 8 ++-- .../UniformHeterogeneousAssignmentBuilderTest.java | 45 ++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java index 682dbbd677d..a5e63d87805 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java @@ -119,7 +119,7 @@ public class UniformHeterogeneousAssignmentBuilder { this.subscribedTopicIds = new HashSet<>(); this.membersPerTopic = new HashMap<>(); this.targetAssignment = new HashMap<>(); -groupSpec.memberIds().forEach(memberId -> +groupSpec.memberIds().forEach(memberId -> { groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> { // Check if the subscribed topic exists. int partitionCount = subscribedTopicDescriber.numPartitions(topicId); @@ -130,9 +130,9 @@ public class UniformHeterogeneousAssignmentBuilder { } subscribedTopicIds.add(topicId); membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId); -targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>())); -}) -); +}); +targetAssignment.put(memberId, new MemberAssignmentImpl(new HashMap<>())); +}); this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, subscribedTopicDescriber)
(kafka) branch trunk updated: MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new ac2a642ba99 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) ac2a642ba99 is described below commit ac2a642ba99cde8a398510ce5cf503462863b489 Author: Chris Egerton AuthorDate: Tue Jun 11 21:13:35 2024 +0200 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) Reviewers: Greg Harris --- .../kafka/connect/integration/ConnectWorkerIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 83fce9231f7..c540016f104 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -1309,7 +1309,6 @@ public class ConnectWorkerIntegrationTest { // since failure to reconfigure the tasks (which may occur if the bug this test was written // to help catch resurfaces) will not cause existing tasks to fail or stop running StartAndStopLatch restarts = connectorHandle.expectedStarts(1); -connectorHandle.expectedCommits(NUM_TASKS * 2); final String secondConnectorTopic = "connector-topic-2"; connect.kafka().createTopic(secondConnectorTopic, 1); @@ -1323,6 +1322,9 @@ public class ConnectWorkerIntegrationTest { "Connector tasks were not restarted in time", restarts.await(10, TimeUnit.SECONDS) ); + +// Wait for at least one task to commit offsets after being restarted +connectorHandle.expectedCommits(1); connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
(kafka) branch 3.8 updated: MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 5c13a6cf2f2 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) 5c13a6cf2f2 is described below commit 5c13a6cf2f2bb37a6f7b80595483a56cbd85a77f Author: Chris Egerton AuthorDate: Tue Jun 11 21:13:35 2024 +0200 MINOR: Fix flaky test ConnectWorkerIntegrationTest::testReconfigureConnectorWithFailingTaskConfigs (#16273) Reviewers: Greg Harris --- .../kafka/connect/integration/ConnectWorkerIntegrationTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 83fce9231f7..c540016f104 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -1309,7 +1309,6 @@ public class ConnectWorkerIntegrationTest { // since failure to reconfigure the tasks (which may occur if the bug this test was written // to help catch resurfaces) will not cause existing tasks to fail or stop running StartAndStopLatch restarts = connectorHandle.expectedStarts(1); -connectorHandle.expectedCommits(NUM_TASKS * 2); final String secondConnectorTopic = "connector-topic-2"; connect.kafka().createTopic(secondConnectorTopic, 1); @@ -1323,6 +1322,9 @@ public class ConnectWorkerIntegrationTest { "Connector tasks were not restarted in time", restarts.await(10, TimeUnit.SECONDS) ); + +// Wait for at least one task to commit offsets after being restarted +connectorHandle.expectedCommits(1); connectorHandle.awaitCommits(offsetCommitIntervalMs * 3); final long endOffset = connect.kafka().endOffset(new TopicPartition(secondConnectorTopic, 0));
(kafka) branch trunk updated (8b6013f851f -> 2fa2c72581d)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 8b6013f851f KAFKA-15045: (KIP-924 pt. 21) UUID to ProcessId migration (#16269) add 2fa2c72581d MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) No new revisions were added by this update. Summary of changes: .../kafka/connect/integration/OffsetsApiIntegrationTest.java | 9 + 1 file changed, 9 insertions(+)
(kafka) branch 3.8 updated: MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286)
This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 520fbb4116b MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) 520fbb4116b is described below commit 520fbb4116b92bbb362e2a67f0b20ffc644f2903 Author: Chris Egerton AuthorDate: Tue Jun 11 23:15:07 2024 +0200 MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) Reviewers: Greg Harris --- .../kafka/connect/integration/OffsetsApiIntegrationTest.java | 9 + 1 file changed, 9 insertions(+) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index dc507b68df7..2da52cf9abd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -143,6 +143,15 @@ public class OffsetsApiIntegrationTest { result.start(); +try { +result.assertions().assertExactlyNumWorkersAreUp( +NUM_WORKERS, +"Workers did not complete startup in time" +); +} catch (InterruptedException e) { +throw new RuntimeException("Interrupted while awaiting cluster startup", e); +} + return result; }); }
(kafka) branch trunk updated (2fa2c72581d -> 23fe71d579f)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 2fa2c72581d MINOR: Wait for embedded clusters to start before using them in Connect OffsetsApiIntegrationTest (#16286) add 23fe71d579f KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820) No new revisions were added by this update. Summary of changes: checkstyle/import-control-core.xml | 1 + .../java/kafka/log/remote/RemoteLogManager.java| 29 +++ .../kafka/log/remote/RemoteLogManagerTest.java | 210 + 3 files changed, 240 insertions(+)
(kafka) branch 3.8 updated: KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (#15993)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 7c30eed66c4 KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (#15993) 7c30eed66c4 is described below commit 7c30eed66c478623da8d0ee22a7379d077ef73e7 Author: Okada Haruki AuthorDate: Thu Jun 6 15:10:13 2024 +0900 KAFKA-16541 Fix potential leader-epoch checkpoint file corruption (#15993) A patch for KAFKA-15046 got rid of fsync on LeaderEpochFileCache#truncateFromStart/End for performance reason, but it turned out this could cause corrupted leader-epoch checkpoint file on ungraceful OS shutdown, i.e. OS shuts down in the middle when kernel is writing dirty pages back to the device. To address this problem, this PR makes below changes: (1) Revert LeaderEpochCheckpoint#write to always fsync (2) truncateFromStart/End now call LeaderEpochCheckpoint#write asynchronously on scheduler thread (3) UnifiedLog#maybeCreateLeaderEpochCache now loads epoch entries from checkpoint file only when current cache is absent Reviewers: Jun Rao --- .../java/kafka/log/remote/RemoteLogManager.java| 46 -- core/src/main/scala/kafka/log/LogLoader.scala | 4 +- core/src/main/scala/kafka/log/UnifiedLog.scala | 43 +++--- .../server/checkpoints/OffsetCheckpointFile.scala | 2 +- .../kafka/log/remote/RemoteLogManagerTest.java | 120 --- .../unit/kafka/cluster/PartitionLockTest.scala | 3 +- .../scala/unit/kafka/cluster/PartitionTest.scala | 3 +- .../unit/kafka/log/LogCleanerManagerTest.scala | 3 +- .../test/scala/unit/kafka/log/LogCleanerTest.scala | 5 +- .../test/scala/unit/kafka/log/LogLoaderTest.scala | 23 ++- .../test/scala/unit/kafka/log/LogSegmentTest.scala | 16 +- .../unit/kafka/server/ReplicaManagerTest.scala | 5 +- .../InMemoryLeaderEpochCheckpointTest.scala| 58 --- ...ffsetCheckpointFileWithFailureHandlerTest.scala | 18 ++- .../server/epoch/LeaderEpochFileCacheTest.scala| 64 .../scala/unit/kafka/utils/SchedulerTest.scala | 3 +- .../apache/kafka/server/common/CheckpointFile.java | 8 +- .../CheckpointFileWithFailureHandler.java | 22 ++- .../checkpoint/InMemoryLeaderEpochCheckpoint.java | 63 .../checkpoint/LeaderEpochCheckpoint.java | 34 - .../checkpoint/LeaderEpochCheckpointFile.java | 11 +- .../internals/epoch/LeaderEpochFileCache.java | 168 - 22 files changed, 364 insertions(+), 358 deletions(-) diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 137985d00b5..43c03190767 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -47,6 +47,7 @@ import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; import org.apache.kafka.server.common.OffsetAndEpoch; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager; @@ -62,7 +63,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; import org.apache.kafka.server.log.remote.storage.RemoteStorageException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; import org.apache.kafka.server.metrics.KafkaMetricsGroup; -import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache; import org.apache.kafka.storage.internals.log.AbortedTxn; import org.apache.kafka.storage.internals.log.EpochEntry; @@ -84,12 +85,16 @@ import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.JavaConverters; +import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStreamWriter; import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -618,25 +623,23 @@ public class RemoteLogManager implements Closeable { } /** - * Returns the leader epoch checkpoint by truncating with the given start[exclusive] and end[inclusive] offset + * Returns the leader epoch entries within the range of the given start[exclusive] and end
(kafka) branch 3.8 updated: KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820)
This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 0b4fcbb16d0 KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820) 0b4fcbb16d0 is described below commit 0b4fcbb16d0fbab67df906bdfe0bb7b880503e22 Author: Abhijeet Kumar AuthorDate: Wed Jun 12 06:27:02 2024 +0530 KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820) - Added the integration of the quota manager to throttle copy requests to the remote storage. Reference KIP-956 - Added unit-tests for the copy throttling logic. Reviewers: Satish Duggana , Luke Chen , Kamal Chandraprakash --- checkstyle/import-control-core.xml | 1 + .../java/kafka/log/remote/RemoteLogManager.java| 29 +++ .../kafka/log/remote/RemoteLogManagerTest.java | 210 + 3 files changed, 240 insertions(+) diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index ed6c53a322b..a30de55e415 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -38,6 +38,7 @@ + diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java index 43c03190767..b920a962afc 100644 --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java @@ -97,6 +97,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.security.PrivilegedAction; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -123,6 +124,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -160,6 +163,8 @@ public class RemoteLogManager implements Closeable { private final RemoteLogMetadataManager remoteLogMetadataManager; +private final ReentrantLock copyQuotaManagerLock = new ReentrantLock(true); +private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition(); private final RLMQuotaManager rlmCopyQuotaManager; private final RLMQuotaManager rlmFetchQuotaManager; @@ -250,6 +255,13 @@ public class RemoteLogManager implements Closeable { remoteStorageReaderThreadPool.removeMetrics(); } +/** + * Returns the timeout for the RLM Tasks to wait for the quota to be available + */ +Duration quotaTimeout() { +return Duration.ofSeconds(1); +} + RLMQuotaManager createRLMCopyQuotaManager() { return new RLMQuotaManager(copyQuotaManagerConfig(rlmConfig), metrics, QuotaType.RLMCopy$.MODULE$, "Tracking copy byte-rate for Remote Log Manager", time); @@ -763,6 +775,23 @@ public class RemoteLogManager implements Closeable { isCancelled(), isLeader()); return; } + +copyQuotaManagerLock.lock(); +try { +while (rlmCopyQuotaManager.isQuotaExceeded()) { +logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available."); +// If the thread gets interrupted while waiting, the InterruptedException is thrown +// back to the caller. It's important to note that the task being executed is already +// cancelled before the executing thread is interrupted. The caller is responsible +// for handling the exception gracefully by checking if the task is already cancelled. +boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS); +} + rlmCopyQuotaManager.record(candidateLogSegment.logSegment.log().sizeInBytes()); +// Signal waiting threads to check the quota again +copyQuotaManagerLockCondition.signalAll(); +} finally { +copyQuotaManagerLock.unlock(); +} copyLogSegment(log, candidateLogSegment.logSegment,
(kafka) branch trunk updated (23fe71d579f -> 226ac5e8fca)
This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 23fe71d579f KAFKA-15265: Integrate RLMQuotaManager for throttling copies to remote storage (#15820) add 226ac5e8fca KAFKA-16922 Adding unit tests for NewTopic (#16255) No new revisions were added by this update. Summary of changes: .../apache/kafka/clients/admin/NewTopicTest.java | 185 + 1 file changed, 185 insertions(+) create mode 100644 clients/src/test/java/org/apache/kafka/clients/admin/NewTopicTest.java
(kafka) branch trunk updated: KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)
This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0782232fbeb KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294) 0782232fbeb is described below commit 0782232fbeb6313a316b930d12508d1d6148f3c9 Author: Antoine Pourchet AuthorDate: Tue Jun 11 22:31:43 2024 -0600 KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294) We now provide a way to more easily customize the rack aware optimizations that we provide by way of a configuration class called RackAwareOptimizationParams. We also simplified the APIs for the optimizeXYZ utility functions since they were mutating the inputs anyway. Reviewers: Anna Sophie Blee-Goldman --- .../processor/assignment/TaskAssignmentUtils.java | 207 - .../assignment/assignors/StickyTaskAssignor.java | 38 ++-- .../assignment/TaskAssignmentUtilsTest.java| 7 +- 3 files changed, 185 insertions(+), 67 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java index f67e54b2e1c..39a698adfa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -55,7 +56,103 @@ public final class TaskAssignmentUtils { private TaskAssignmentUtils() {} /** - * Return an {@code AssignmentError} for a task assignment created for an application. + * A simple config container for necessary parameters and optional overrides to apply when + * running the active or standby task rack-aware optimizations. + */ +public static class RackAwareOptimizationParams { +private final ApplicationState applicationState; +private final Optional trafficCostOverride; +private final Optional nonOverlapCostOverride; +private final Optional> tasksToOptimize; + +private RackAwareOptimizationParams(final ApplicationState applicationState, + final Optional trafficCostOverride, + final Optional nonOverlapCostOverride, + final Optional> tasksToOptimize) { +this.applicationState = applicationState; +this.trafficCostOverride = trafficCostOverride; +this.nonOverlapCostOverride = nonOverlapCostOverride; +this.tasksToOptimize = tasksToOptimize; +} + +/** + * Return a new config object with no overrides and the tasksToOptimize initialized to the set of all tasks in the given ApplicationState + */ +public static RackAwareOptimizationParams of(final ApplicationState applicationState) { +return new RackAwareOptimizationParams(applicationState, Optional.empty(), Optional.empty(), Optional.empty()); +} + +/** + * Return a new config object with the tasksToOptimize set to all stateful tasks in the given ApplicationState + */ +public RackAwareOptimizationParams forStatefulTasks() { +final SortedSet tasks = applicationState.allTasks().values() +.stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toCollection(TreeSet::new)); +return forTasks(tasks); +} + +/** + * Return a new config object with the tasksToOptimize set to all stateless tasks in the given ApplicationState + */ +public RackAwareOptimizationParams forStatelessTasks() { +final SortedSet tasks = applicationState.allTasks().values() +.stream() +.filter(taskInfo -> !taskInfo.isStateful()) +.map(TaskInfo::id) +.collect(Collectors.toCollection(TreeSet::new)); +return forTasks(tasks); +} + +/** + * Return a new config object with the provided tasksToOptimize + */ +public RackAwareOptimizationParams forTasks(final SortedSet tasksToOptimize) { +return new RackAwareOptimizationParams( +applicationState, +trafficCostOverride, +nonOverlapCostOver
(kafka) branch 3.8 updated: KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294)
This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 77a6fe9c2a5 KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294) 77a6fe9c2a5 is described below commit 77a6fe9c2a5d7d918536162c7db37b67c772d494 Author: Antoine Pourchet AuthorDate: Tue Jun 11 22:31:43 2024 -0600 KAFKA-15045: (KIP-924 pt. 22) Add RackAwareOptimizationParams and other minor TaskAssignmentUtils changes (#16294) We now provide a way to more easily customize the rack aware optimizations that we provide by way of a configuration class called RackAwareOptimizationParams. We also simplified the APIs for the optimizeXYZ utility functions since they were mutating the inputs anyway. Reviewers: Anna Sophie Blee-Goldman --- .../processor/assignment/TaskAssignmentUtils.java | 207 - .../assignment/assignors/StickyTaskAssignor.java | 38 ++-- .../assignment/TaskAssignmentUtilsTest.java| 7 +- 3 files changed, 185 insertions(+), 67 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java index f67e54b2e1c..39a698adfa5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -55,7 +56,103 @@ public final class TaskAssignmentUtils { private TaskAssignmentUtils() {} /** - * Return an {@code AssignmentError} for a task assignment created for an application. + * A simple config container for necessary parameters and optional overrides to apply when + * running the active or standby task rack-aware optimizations. + */ +public static class RackAwareOptimizationParams { +private final ApplicationState applicationState; +private final Optional trafficCostOverride; +private final Optional nonOverlapCostOverride; +private final Optional> tasksToOptimize; + +private RackAwareOptimizationParams(final ApplicationState applicationState, + final Optional trafficCostOverride, + final Optional nonOverlapCostOverride, + final Optional> tasksToOptimize) { +this.applicationState = applicationState; +this.trafficCostOverride = trafficCostOverride; +this.nonOverlapCostOverride = nonOverlapCostOverride; +this.tasksToOptimize = tasksToOptimize; +} + +/** + * Return a new config object with no overrides and the tasksToOptimize initialized to the set of all tasks in the given ApplicationState + */ +public static RackAwareOptimizationParams of(final ApplicationState applicationState) { +return new RackAwareOptimizationParams(applicationState, Optional.empty(), Optional.empty(), Optional.empty()); +} + +/** + * Return a new config object with the tasksToOptimize set to all stateful tasks in the given ApplicationState + */ +public RackAwareOptimizationParams forStatefulTasks() { +final SortedSet tasks = applicationState.allTasks().values() +.stream() +.filter(TaskInfo::isStateful) +.map(TaskInfo::id) +.collect(Collectors.toCollection(TreeSet::new)); +return forTasks(tasks); +} + +/** + * Return a new config object with the tasksToOptimize set to all stateless tasks in the given ApplicationState + */ +public RackAwareOptimizationParams forStatelessTasks() { +final SortedSet tasks = applicationState.allTasks().values() +.stream() +.filter(taskInfo -> !taskInfo.isStateful()) +.map(TaskInfo::id) +.collect(Collectors.toCollection(TreeSet::new)); +return forTasks(tasks); +} + +/** + * Return a new config object with the provided tasksToOptimize + */ +public RackAwareOptimizationParams forTasks(final SortedSet tasksToOptimize) { +return new RackAwareOptimizationParams( +applicationState, +trafficCostOverride, +nonOverlapCostOverride
(kafka) branch trunk updated: KAFKA-10199: Enable state updater by default (#16107)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 39ffdea6d32 KAFKA-10199: Enable state updater by default (#16107) 39ffdea6d32 is described below commit 39ffdea6d321ef3dd5e787aef1b1102c33448c0f Author: Bruno Cadonna AuthorDate: Wed Jun 12 07:51:38 2024 +0200 KAFKA-10199: Enable state updater by default (#16107) We have already enabled the state updater by default once. However, we ran into issues that forced us to disable it again. We think that we fixed those issues. So we want to enable the state updater again by default. Reviewers: Lucas Brutschy , Matthias J. Sax --- streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +- .../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index bfea3d43680..e77e4ca795d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1273,7 +1273,7 @@ public class StreamsConfig extends AbstractConfig { public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static boolean getStateUpdaterEnabled(final Map configs) { -return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false); +return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } // Private API to enable processing threads (i.e. polling is decoupled from processing) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 763394611b9..457508cd20e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -468,10 +468,10 @@ public class StoreChangelogReaderTest { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); } else { if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) -|| !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) { -assertEquals(Duration.ZERO, consumer.lastPollTimeout()); -} else { +|| (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); +} else { +assertEquals(Duration.ZERO, consumer.lastPollTimeout()); } } }
(kafka) branch 3.8 updated: KAFKA-10199: Enable state updater by default (#16107)
This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 8de153ebd6c KAFKA-10199: Enable state updater by default (#16107) 8de153ebd6c is described below commit 8de153ebd6c44b12940b156a0d3ba4aa2795f6f8 Author: Bruno Cadonna AuthorDate: Wed Jun 12 07:51:38 2024 +0200 KAFKA-10199: Enable state updater by default (#16107) We have already enabled the state updater by default once. However, we ran into issues that forced us to disable it again. We think that we fixed those issues. So we want to enable the state updater again by default. Reviewers: Lucas Brutschy , Matthias J. Sax --- streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | 2 +- .../kafka/streams/processor/internals/StoreChangelogReaderTest.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index ffeb4105cf1..502eab8eb87 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1261,7 +1261,7 @@ public class StreamsConfig extends AbstractConfig { public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; public static boolean getStateUpdaterEnabled(final Map configs) { -return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, false); +return InternalConfig.getBoolean(configs, InternalConfig.STATE_UPDATER_ENABLED, true); } // Private API to enable processing threads (i.e. polling is decoupled from processing) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index 763394611b9..457508cd20e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -468,10 +468,10 @@ public class StoreChangelogReaderTest { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); } else { if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED) -|| !((boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED))) { -assertEquals(Duration.ZERO, consumer.lastPollTimeout()); -} else { +|| (boolean) properties.get(InternalConfig.STATE_UPDATER_ENABLED)) { assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), consumer.lastPollTimeout()); +} else { +assertEquals(Duration.ZERO, consumer.lastPollTimeout()); } } }
(kafka) branch trunk updated: KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 638844f833b KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215) 638844f833b is described below commit 638844f833b165d6f9ca52c173858d26b7254fac Author: David Jacot AuthorDate: Wed Jun 12 08:29:50 2024 +0200 KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215) This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms. Reviewers: Jeff Kim , Justine Olshan --- .../kafka/common/record/MemoryRecordsBuilder.java | 12 + .../src/main/scala/kafka/server/BrokerServer.scala | 1 + core/src/main/scala/kafka/server/KafkaConfig.scala | 2 + .../coordinator/group/GroupCoordinatorConfig.java | 12 + .../coordinator/group/GroupCoordinatorService.java | 2 +- .../group/runtime/CoordinatorRuntime.java | 672 ++- .../group/GroupCoordinatorConfigTest.java | 3 + .../group/GroupCoordinatorServiceTest.java | 1 + .../group/runtime/CoordinatorRuntimeTest.java | 736 - .../apache/kafka/server/util/timer/TimerTask.java | 4 + 10 files changed, 1263 insertions(+), 182 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 70f279a6c29..b37b1f1ca68 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable { return this.writeLimit >= estimatedBytesWritten() + recordSize; } +/** + * Check if we have room for a given number of bytes. + */ +public boolean hasRoomFor(int estimatedRecordsSize) { +if (isFull()) return false; +return this.writeLimit >= estimatedBytesWritten() + estimatedRecordsSize; +} + +public int maxAllowedBytes() { +return this.writeLimit - this.batchHeaderSizeInBytes; +} + public boolean isClosed() { return builtRecords != null; } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 31db58c0778..7e225440abf 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -570,6 +570,7 @@ class BrokerServer( val serde = new CoordinatorRecordSerde val groupCoordinatorConfig = new GroupCoordinatorConfig( config.groupCoordinatorNumThreads, +config.groupCoordinatorAppendLingerMs, config.consumerGroupSessionTimeoutMs, config.consumerGroupHeartbeatIntervalMs, config.consumerGroupMaxSize, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ad8635f7ce7..db96bcb6762 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -278,6 +278,7 @@ object KafkaConfig { .define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC) + .define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) // Internal configuration used by integration and system tests. .defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC) @@ -965,6 +966,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) || groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER) val groupCoordinatorNumThreads = getInt(GroupC
(kafka) branch 3.8 updated: KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215)
This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 6016b15beab KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215) 6016b15beab is described below commit 6016b15beabd774de6a358f5fdb62a336b7de43e Author: David Jacot AuthorDate: Wed Jun 12 08:29:50 2024 +0200 KAFKA-16770; [2/2] Coalesce records into bigger batches (#16215) This patch is the continuation of https://github.com/apache/kafka/pull/15964. It introduces the records coalescing to the CoordinatorRuntime. It also introduces a new configuration `group.coordinator.append.linger.ms` which allows administrators to chose the linger time or disable it with zero. The new configuration defaults to 10ms. Reviewers: Jeff Kim , Justine Olshan --- .../kafka/common/record/MemoryRecordsBuilder.java | 12 + .../src/main/scala/kafka/server/BrokerServer.scala | 1 + core/src/main/scala/kafka/server/KafkaConfig.scala | 2 + .../coordinator/group/GroupCoordinatorConfig.java | 12 + .../coordinator/group/GroupCoordinatorService.java | 2 +- .../group/runtime/CoordinatorRuntime.java | 672 ++- .../group/GroupCoordinatorConfigTest.java | 3 + .../group/GroupCoordinatorServiceTest.java | 1 + .../group/runtime/CoordinatorRuntimeTest.java | 736 - .../apache/kafka/server/util/timer/TimerTask.java | 4 + 10 files changed, 1263 insertions(+), 182 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index a5985103ec0..bbcd99070cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -870,6 +870,18 @@ public class MemoryRecordsBuilder implements AutoCloseable { return this.writeLimit >= estimatedBytesWritten() + recordSize; } +/** + * Check if we have room for a given number of bytes. + */ +public boolean hasRoomFor(int estimatedRecordsSize) { +if (isFull()) return false; +return this.writeLimit >= estimatedBytesWritten() + estimatedRecordsSize; +} + +public int maxAllowedBytes() { +return this.writeLimit - this.batchHeaderSizeInBytes; +} + public boolean isClosed() { return builtRecords != null; } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index e143dd26668..cb4799afdfb 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -572,6 +572,7 @@ class BrokerServer( val serde = new CoordinatorRecordSerde val groupCoordinatorConfig = new GroupCoordinatorConfig( config.groupCoordinatorNumThreads, +config.groupCoordinatorAppendLingerMs, config.consumerGroupSessionTimeoutMs, config.consumerGroupHeartbeatIntervalMs, config.consumerGroupMaxSize, diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 745c8648e38..6c9ef51fb0a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -278,6 +278,7 @@ object KafkaConfig { .define(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, ValidList.in(Utils.enumOptions(classOf[GroupType]):_*), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) .define(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_DOC) + .define(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) // Internal configuration used by integration and system tests. .defineInternal(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG, BOOLEAN, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DEFAULT, null, MEDIUM, GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_DOC) @@ -948,6 +949,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val isNewGroupCoordinatorEnabled = getBoolean(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG) || groupCoordinatorRebalanceProtocols.contains(GroupType.CONSUMER) val groupCoordinatorNumThreads = getInt(GroupCoord