This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 21645ebf0bf KAFKA-18705: Move ConfigRepository to metadata module
(#18784)
21645ebf0bf is described below
commit 21645ebf0bf7ea5902fb3ceb7737d688847ed318
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Feb 5 18:13:36 2025 +0800
KAFKA-18705: Move ConfigRepository to metadata module (#18784)
Reviewers: TengYao Chi <[email protected]>, Christo Lolov
<[email protected]>
---
build.gradle | 1 +
.../kafka/server/builders/KafkaApisBuilder.java | 2 +-
.../kafka/server/builders/LogManagerBuilder.java | 2 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
.../scala/kafka/server/ConfigAdminManager.scala | 2 +-
.../src/main/scala/kafka/server/ConfigHelper.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../main/scala/kafka/server/MetadataCache.scala | 4 +-
.../kafka/server/metadata/ConfigRepository.scala | 62 ---------------------
.../server/metadata/MockConfigRepository.scala | 62 ---------------------
.../unit/kafka/cluster/AbstractPartitionTest.scala | 2 +-
.../unit/kafka/cluster/PartitionLockTest.scala | 3 +-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 3 +-
.../unit/kafka/server/ConfigAdminManagerTest.scala | 2 +-
.../server/HighwatermarkPersistenceTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../server/ReplicaManagerConcurrencyTest.scala | 3 +-
.../server/metadata/MockConfigRepositoryTest.scala | 55 -------------------
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
.../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 2 +-
.../metadata/KRaftMetadataRequestBenchmark.java | 2 +-
.../partition/PartitionMakeFollowerBenchmark.java | 2 +-
.../UpdateFollowerFetchStateBenchmark.java | 2 +-
.../apache/kafka/jmh/server/CheckpointBench.java | 2 +-
.../kafka/jmh/server/PartitionCreationBench.java | 4 +-
.../apache/kafka/metadata/ConfigRepository.java | 63 ++++++++++++++++++++++
.../kafka/metadata/MockConfigRepository.java | 63 ++++++++++++++++++++++
.../kafka/metadata/MockConfigRepositoryTest.java | 56 +++++++++++++++++++
29 files changed, 208 insertions(+), 207 deletions(-)
diff --git a/build.gradle b/build.gradle
index 9a6c4f2714d..ad68058c183 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3297,6 +3297,7 @@ project(':jmh-benchmarks') {
implementation project(':clients').sourceSets.test.output
implementation project(':core').sourceSets.test.output
implementation project(':server-common').sourceSets.test.output
+ implementation project(':metadata').sourceSets.test.output
implementation libs.jmhCore
annotationProcessor libs.jmhGeneratorAnnProcess
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index b4764f8d284..a3dc9856f7d 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -29,13 +29,13 @@ import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.QuotaFactory.QuotaManagers;
import kafka.server.ReplicaManager;
-import kafka.server.metadata.ConfigRepository;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
+import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.server.ClientMetricsManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 53f6c0dd305..23b91f1d4a4 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -18,9 +18,9 @@
package kafka.server.builders;
import kafka.log.LogManager;
-import kafka.server.metadata.ConfigRepository;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.Scheduler;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 572cd5d7b8b..8bf6fa0292f 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,7 +22,6 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
-import kafka.server.metadata.ConfigRepository
import kafka.server.{KafkaConfig, KafkaRaftServer}
import kafka.server.metadata.BrokerMetadataPublisher.info
import kafka.utils.threadsafe
@@ -36,6 +35,7 @@ import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import org.apache.kafka.image.TopicsImage
+import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, PropertiesUtils}
import java.util.{Collections, OptionalLong, Properties}
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index b0b84fc8c60..45a68d2d036 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -20,7 +20,6 @@ import kafka.server.logger.RuntimeLoggerManager
import java.util
import java.util.Properties
-import kafka.server.metadata.ConfigRepository
import kafka.utils._
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -36,6 +35,7 @@ import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST,
UNKNOWN_SERVER_ERROR}
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.apache.kafka.metadata.ConfigRepository
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.{Map, Seq}
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 095a474441a..d44a92dc237 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -20,7 +20,6 @@ package kafka.server
import kafka.network.RequestChannel
import java.util.{Collections, Properties}
-import kafka.server.metadata.ConfigRepository
import kafka.utils.{Log4jController, Logging}
import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigResource}
@@ -34,6 +33,7 @@ import
org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
import org.apache.kafka.coordinator.group.GroupConfig
+import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.storage.internals.log.LogConfig
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18795a7e0f3..d6be221b77d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,6 @@ import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinat
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
-import kafka.server.metadata.ConfigRepository
import kafka.server.share.SharePartitionManager
import kafka.utils.Logging
import org.apache.kafka.admin.AdminUtils
@@ -59,6 +58,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch,
Time}
import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
TransactionVersion}
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 703f2d8e320..45b8ba1e383 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,12 +17,12 @@
package kafka.server
-import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.admin.BrokerMetadata
import org.apache.kafka.common.message.{DescribeClientQuotasRequestData,
DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData,
DescribeUserScramCredentialsRequestData,
DescribeUserScramCredentialsResponseData, MetadataResponseData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
-import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion,
MetadataVersion}
import java.util
diff --git a/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala
b/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala
deleted file mode 100644
index 9f59a07ff57..00000000000
--- a/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util.Properties
-
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.config.ConfigResource.Type
-
-trait ConfigRepository {
- /**
- * Return a copy of the topic configuration for the given topic. Future
changes will not be reflected.
- *
- * @param topicName the name of the topic for which the configuration will
be returned
- * @return a copy of the topic configuration for the given topic
- */
- def topicConfig(topicName: String): Properties = {
- config(new ConfigResource(Type.TOPIC, topicName))
- }
-
- /**
- * Return a copy of the broker configuration for the given broker. Future
changes will not be reflected.
- *
- * @param brokerId the id of the broker for which configuration will be
returned
- * @return a copy of the broker configuration for the given broker
- */
- def brokerConfig(brokerId: Int): Properties = {
- config(new ConfigResource(Type.BROKER, brokerId.toString))
- }
-
- /**
- * Return a copy of the group configuration for the given group. Future
changes will not be reflected.
- *
- * @param groupName the name of the group for which configuration will be
returned
- * @return a copy of the group configuration for the given group
- */
- def groupConfig(groupName: String): Properties = {
- config(new ConfigResource(Type.GROUP, groupName))
- }
-
- /**
- * Return a copy of the configuration for the given resource. Future
changes will not be reflected.
- * @param configResource the resource for which the configuration will be
returned
- * @return a copy of the configuration for the given resource
- */
- def config(configResource: ConfigResource): Properties
-}
diff --git
a/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala
b/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala
deleted file mode 100644
index 27e4c1d8869..00000000000
--- a/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util
-import java.util.Properties
-
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
-
-object MockConfigRepository {
- def forTopic(topic: String, key: String, value: String):
MockConfigRepository = {
- val properties = new Properties()
- properties.put(key, value)
- forTopic(topic, properties)
- }
-
- def forTopic(topic: String, properties: Properties): MockConfigRepository = {
- val repository = new MockConfigRepository()
- repository.configs.put(new ConfigResource(TOPIC, topic), properties)
- repository
- }
-}
-
-class MockConfigRepository extends ConfigRepository {
- val configs = new util.HashMap[ConfigResource, Properties]()
-
- override def config(configResource: ConfigResource): Properties =
configs.synchronized {
- configs.getOrDefault(configResource, new Properties())
- }
-
- def setConfig(configResource: ConfigResource, key: String, value: String):
Unit = configs.synchronized {
- val properties = configs.getOrDefault(configResource, new Properties())
- val newProperties = new Properties()
- newProperties.putAll(properties)
- if (value == null) {
- newProperties.remove(key)
- } else {
- newProperties.put(key, value)
- }
- configs.put(configResource, newProperties)
- }
-
- def setTopicConfig(topicName: String, key: String, value: String): Unit =
configs.synchronized {
- setConfig(new ConfigResource(TOPIC, topicName), key, value)
- }
-}
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 2cd271e1fe4..fb81da5bbb3 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -18,13 +18,13 @@ package kafka.cluster
import kafka.log.LogManager
import kafka.server.MetadataCache
-import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils
import kafka.utils.TestUtils.MockAlterPartitionManager
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.MockTime
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index f3a949c22ec..e496f88449d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -23,7 +23,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import kafka.log._
import kafka.server._
-import kafka.server.metadata.MockConfigRepository
import kafka.utils._
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.config.TopicConfig
@@ -33,7 +32,7 @@ import org.apache.kafka.common.requests.{FetchRequest,
LeaderAndIsrRequest}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.metadata.{LeaderAndIsr, MockConfigRepository}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index a2b49685b43..91e6c44a27c 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -18,7 +18,6 @@
package kafka.log
import kafka.server.KafkaConfig
-import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
@@ -27,6 +26,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch,
MemoryRecords, RecordBatch, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.util.{MockTime, Scheduler}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig,
EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader,
LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason,
OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index f8595ede761..d5f9336a0f8 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,6 @@
package kafka.log
import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.utils._
import org.apache.directory.api.util.FileUtils
import org.apache.kafka.common.config.TopicConfig
@@ -27,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{TopicImage, TopicsImage}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.metadata.{ConfigRepository, LeaderRecoveryState,
MockConfigRepository, PartitionRegistration}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.MetadataVersion
import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
index a4494c5f1e7..6d865219322 100644
--- a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
@@ -20,7 +20,6 @@ package kafka.server
import java.util
import java.util.Collections
-import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER,
BROKER_LOGGER, TOPIC, UNKNOWN}
@@ -39,6 +38,7 @@ import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE}
import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.metadata.MockConfigRepository
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Assertions, Test}
import org.slf4j.LoggerFactory
diff --git
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index f625afa1fa7..5465c67e5e1 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,9 +24,9 @@ import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._
import kafka.utils.TestUtils
import kafka.cluster.Partition
-import kafka.server.metadata.MockConfigRepository
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{CleanerConfig,
LogDirFailureChannel}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 11ebee134b8..8169705fc11 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -22,7 +22,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinat
import kafka.log.UnifiedLog
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache,
MockConfigRepository}
+import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.SharePartitionManager
import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -76,6 +76,7 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator,
GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.metadata.{ConfigRepository, MockConfigRepository}
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 09c9a82446a..3ba92d7c1ae 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{CompletableFuture, Executors,
LinkedBlockingQueue,
import java.util.{Optional, Properties}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
-import kafka.server.metadata.MockConfigRepository
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.metadata.RegisterBrokerRecord
@@ -36,7 +35,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
+import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState,
MockConfigRepository}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.QuorumConfig
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala
deleted file mode 100644
index fbeccc1e646..00000000000
---
a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util.Properties
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-class MockConfigRepositoryTest {
- @Test
- def testEmptyRepository(): Unit = {
- val repository = new MockConfigRepository()
- assertEquals(new Properties(), repository.brokerConfig(0))
- assertEquals(new Properties(), repository.topicConfig("foo"))
- assertEquals(new Properties(), repository.groupConfig("group"))
- }
-
- @Test
- def testSetTopicConfig(): Unit = {
- val repository = new MockConfigRepository()
- val topic0 = "topic0"
- repository.setTopicConfig(topic0, "foo", null)
-
- val topic1 = "topic1"
- repository.setTopicConfig(topic1, "foo", "bar")
- val topicProperties = new Properties()
- topicProperties.put("foo", "bar")
- assertEquals(topicProperties, repository.topicConfig(topic1))
-
- val topicProperties2 = new Properties()
- topicProperties2.put("foo", "bar")
- topicProperties2.put("foo2", "baz")
- repository.setTopicConfig(topic1, "foo2", "baz") // add another prop
- assertEquals(topicProperties2, repository.topicConfig(topic1)) // should
get both props
-
- repository.setTopicConfig(topic1, "foo2", null)
- assertEquals(topicProperties, repository.topicConfig(topic1))
- }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c9331a70c53..83e994d5f88 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,7 +21,6 @@ import kafka.log._
import kafka.network.RequestChannel
import kafka.security.JaasTestUtils
import kafka.server._
-import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
@@ -47,7 +46,7 @@ import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Utils.formatAddress
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr,
MockConfigRepository}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.QuorumConfig
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 5c08768aefd..54b9764d3fa 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -32,7 +32,6 @@ import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
import kafka.utils.Pool;
import kafka.utils.TestUtils;
@@ -54,6 +53,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index d58b35942fe..2018828ce51 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -34,7 +34,6 @@ import kafka.server.ReplicationQuotaManager;
import kafka.server.SimpleApiVersionManager;
import kafka.server.builders.KafkaApisBuilder;
import kafka.server.metadata.KRaftMetadataCache;
-import kafka.server.metadata.MockConfigRepository;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.common.Uuid;
@@ -56,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.network.RequestConvertToJson;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
import org.apache.kafka.raft.QuorumConfig;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 4194927504a..32ff36c3c06 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -24,7 +24,6 @@ import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -34,6 +33,7 @@ import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 47fefcbaad0..364cd6da399 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -25,12 +25,12 @@ import kafka.log.LogManager;
import kafka.server.AlterPartitionManager;
import kafka.server.MetadataCache;
import kafka.server.builders.LogManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index ddea968215e..e8f4c2c686d 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -24,13 +24,13 @@ import kafka.server.MetadataCache;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.KafkaScheduler;
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 0f5ac6d36b9..3c5aef892ae 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -25,8 +25,6 @@ import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.builders.LogManagerBuilder;
import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.ConfigRepository;
-import kafka.server.metadata.MockConfigRepository;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
@@ -35,6 +33,8 @@ import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ConfigRepository;
+import org.apache.kafka.metadata.MockConfigRepository;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.Scheduler;
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/ConfigRepository.java
b/metadata/src/main/java/org/apache/kafka/metadata/ConfigRepository.java
new file mode 100644
index 00000000000..ddbdf9ddbc7
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ConfigRepository.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.ConfigResource.Type;
+
+import java.util.Properties;
+
+public interface ConfigRepository {
+
+ /**
+ * Return a copy of the topic configuration for the given topic. Future
changes will not be reflected.
+ *
+ * @param topicName the name of the topic for which the configuration will
be returned
+ * @return a copy of the topic configuration for the given topic
+ */
+ default Properties topicConfig(String topicName) {
+ return config(new ConfigResource(Type.TOPIC, topicName));
+ }
+
+ /**
+ * Return a copy of the broker configuration for the given broker. Future
changes will not be reflected.
+ *
+ * @param brokerId the id of the broker for which configuration will be
returned
+ * @return a copy of the broker configuration for the given broker
+ */
+ default Properties brokerConfig(int brokerId) {
+ return config(new ConfigResource(Type.BROKER,
Integer.toString(brokerId)));
+ }
+
+ /**
+ * Return a copy of the group configuration for the given group. Future
changes will not be reflected.
+ *
+ * @param groupName the name of the group for which configuration will be
returned
+ * @return a copy of the group configuration for the given group
+ */
+ default Properties groupConfig(String groupName) {
+ return config(new ConfigResource(Type.GROUP, groupName));
+ }
+
+ /**
+ * Return a copy of the configuration for the given resource. Future
changes will not be reflected.
+ *
+ * @param configResource the resource for which the configuration will be
returned
+ * @return a copy of the configuration for the given resource
+ */
+ Properties config(ConfigResource configResource);
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepository.java
b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepository.java
new file mode 100644
index 00000000000..a8ab01366cc
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepository.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.ConfigResource.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class MockConfigRepository implements ConfigRepository {
+ private final Map<ConfigResource, Properties> configs = new HashMap<>();
+
+ public static MockConfigRepository forTopic(String topic, String key,
String value) {
+ Properties properties = new Properties();
+ properties.put(key, value);
+ return forTopic(topic, properties);
+ }
+
+ public static MockConfigRepository forTopic(String topic, Properties
properties) {
+ MockConfigRepository repository = new MockConfigRepository();
+ repository.configs.put(new ConfigResource(Type.TOPIC, topic),
properties);
+ return repository;
+ }
+
+ @Override
+ public Properties config(ConfigResource configResource) {
+ synchronized (configs) {
+ return configs.getOrDefault(configResource, new Properties());
+ }
+ }
+
+ public void setConfig(ConfigResource configResource, String key, String
value) {
+ synchronized (configs) {
+ Properties properties = configs.getOrDefault(configResource, new
Properties());
+ Properties newProperties = new Properties();
+ newProperties.putAll(properties);
+ newProperties.compute(key, (k, v) -> value);
+ configs.put(configResource, newProperties);
+ }
+ }
+
+ public void setTopicConfig(String topicName, String key, String value) {
+ synchronized (configs) {
+ setConfig(new ConfigResource(Type.TOPIC, topicName), key, value);
+ }
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepositoryTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepositoryTest.java
new file mode 100644
index 00000000000..a90b2bb3340
--- /dev/null
+++
b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepositoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MockConfigRepositoryTest {
+
+ @Test
+ public void testEmptyRepository() {
+ MockConfigRepository repository = new MockConfigRepository();
+ assertEquals(new Properties(), repository.brokerConfig(0));
+ assertEquals(new Properties(), repository.topicConfig("foo"));
+ assertEquals(new Properties(), repository.groupConfig("group"));
+ }
+
+ @Test
+ public void testSetTopicConfig() {
+ MockConfigRepository repository = new MockConfigRepository();
+ String topic0 = "topic0";
+ repository.setTopicConfig(topic0, "foo", null);
+
+ String topic1 = "topic1";
+ repository.setTopicConfig(topic1, "foo", "bar");
+ Properties topicProperties = new Properties();
+ topicProperties.put("foo", "bar");
+ assertEquals(topicProperties, repository.topicConfig(topic1));
+
+ Properties topicProperties2 = new Properties();
+ topicProperties2.put("foo", "bar");
+ topicProperties2.put("foo2", "baz");
+ repository.setTopicConfig(topic1, "foo2", "baz"); // add another prop
+ assertEquals(topicProperties2, repository.topicConfig(topic1)); //
should get both props
+
+ repository.setTopicConfig(topic1, "foo2", null);
+ assertEquals(topicProperties, repository.topicConfig(topic1));
+ }
+}