This is an automated email from the ASF dual-hosted git repository.

clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 21645ebf0bf KAFKA-18705: Move ConfigRepository to metadata module 
(#18784)
21645ebf0bf is described below

commit 21645ebf0bf7ea5902fb3ceb7737d688847ed318
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Feb 5 18:13:36 2025 +0800

    KAFKA-18705: Move ConfigRepository to metadata module (#18784)
    
    Reviewers: TengYao Chi <[email protected]>, Christo Lolov 
<[email protected]>
---
 build.gradle                                       |  1 +
 .../kafka/server/builders/KafkaApisBuilder.java    |  2 +-
 .../kafka/server/builders/LogManagerBuilder.java   |  2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |  2 +-
 .../scala/kafka/server/ConfigAdminManager.scala    |  2 +-
 .../src/main/scala/kafka/server/ConfigHelper.scala |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../main/scala/kafka/server/MetadataCache.scala    |  4 +-
 .../kafka/server/metadata/ConfigRepository.scala   | 62 ---------------------
 .../server/metadata/MockConfigRepository.scala     | 62 ---------------------
 .../unit/kafka/cluster/AbstractPartitionTest.scala |  2 +-
 .../unit/kafka/cluster/PartitionLockTest.scala     |  3 +-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  |  2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  3 +-
 .../unit/kafka/server/ConfigAdminManagerTest.scala |  2 +-
 .../server/HighwatermarkPersistenceTest.scala      |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  3 +-
 .../server/ReplicaManagerConcurrencyTest.scala     |  3 +-
 .../server/metadata/MockConfigRepositoryTest.scala | 55 -------------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  3 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |  2 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |  2 +-
 .../partition/PartitionMakeFollowerBenchmark.java  |  2 +-
 .../UpdateFollowerFetchStateBenchmark.java         |  2 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |  2 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |  4 +-
 .../apache/kafka/metadata/ConfigRepository.java    | 63 ++++++++++++++++++++++
 .../kafka/metadata/MockConfigRepository.java       | 63 ++++++++++++++++++++++
 .../kafka/metadata/MockConfigRepositoryTest.java   | 56 +++++++++++++++++++
 29 files changed, 208 insertions(+), 207 deletions(-)

diff --git a/build.gradle b/build.gradle
index 9a6c4f2714d..ad68058c183 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3297,6 +3297,7 @@ project(':jmh-benchmarks') {
     implementation project(':clients').sourceSets.test.output
     implementation project(':core').sourceSets.test.output
     implementation project(':server-common').sourceSets.test.output
+    implementation project(':metadata').sourceSets.test.output
 
     implementation libs.jmhCore
     annotationProcessor libs.jmhGeneratorAnnProcess
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index b4764f8d284..a3dc9856f7d 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -29,13 +29,13 @@ import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory.QuotaManagers;
 import kafka.server.ReplicaManager;
-import kafka.server.metadata.ConfigRepository;
 import kafka.server.share.SharePartitionManager;
 
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.coordinator.share.ShareCoordinator;
+import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.server.ClientMetricsManager;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index 53f6c0dd305..23b91f1d4a4 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -18,9 +18,9 @@
 package kafka.server.builders;
 
 import kafka.log.LogManager;
-import kafka.server.metadata.ConfigRepository;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.util.Scheduler;
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 572cd5d7b8b..8bf6fa0292f 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -22,7 +22,6 @@ import java.io.{File, IOException}
 import java.nio.file.{Files, NoSuchFileException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.server.metadata.ConfigRepository
 import kafka.server.{KafkaConfig, KafkaRaftServer}
 import kafka.server.metadata.BrokerMetadataPublisher.info
 import kafka.utils.threadsafe
@@ -36,6 +35,7 @@ import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
 import org.apache.kafka.image.TopicsImage
+import org.apache.kafka.metadata.ConfigRepository
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, PropertiesUtils}
 
 import java.util.{Collections, OptionalLong, Properties}
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala 
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index b0b84fc8c60..45a68d2d036 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -20,7 +20,6 @@ import kafka.server.logger.RuntimeLoggerManager
 
 import java.util
 import java.util.Properties
-import kafka.server.metadata.ConfigRepository
 import kafka.utils._
 import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -36,6 +35,7 @@ import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
 import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, 
UNKNOWN_SERVER_ERROR}
 import org.apache.kafka.common.requests.ApiError
 import org.apache.kafka.common.resource.{Resource, ResourceType}
+import org.apache.kafka.metadata.ConfigRepository
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.{Map, Seq}
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala 
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 095a474441a..d44a92dc237 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -20,7 +20,6 @@ package kafka.server
 import kafka.network.RequestChannel
 
 import java.util.{Collections, Properties}
-import kafka.server.metadata.ConfigRepository
 import kafka.utils.{Log4jController, Logging}
 import org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigResource}
@@ -34,6 +33,7 @@ import 
org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
 import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC}
 import org.apache.kafka.coordinator.group.GroupConfig
+import org.apache.kafka.metadata.ConfigRepository
 import org.apache.kafka.server.config.ServerTopicConfigSynonyms
 import org.apache.kafka.storage.internals.log.LogConfig
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18795a7e0f3..d6be221b77d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -21,7 +21,6 @@ import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinat
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
 import kafka.server.handlers.DescribeTopicPartitionsRequestHandler
-import kafka.server.metadata.ConfigRepository
 import kafka.server.share.SharePartitionManager
 import kafka.utils.Logging
 import org.apache.kafka.admin.AdminUtils
@@ -59,6 +58,7 @@ import org.apache.kafka.common.utils.{ProducerIdAndEpoch, 
Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
+import org.apache.kafka.metadata.ConfigRepository
 import org.apache.kafka.server.ClientMetricsManager
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, RequestLocal, 
TransactionVersion}
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index 703f2d8e320..45b8ba1e383 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,12 +17,12 @@
 
 package kafka.server
 
-import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
 import org.apache.kafka.admin.BrokerMetadata
 import org.apache.kafka.common.message.{DescribeClientQuotasRequestData, 
DescribeClientQuotasResponseData, DescribeTopicPartitionsResponseData, 
DescribeUserScramCredentialsRequestData, 
DescribeUserScramCredentialsResponseData, MetadataResponseData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.{Cluster, Node, TopicPartition, Uuid}
-import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr}
 import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
 
 import java.util
diff --git a/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala 
b/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala
deleted file mode 100644
index 9f59a07ff57..00000000000
--- a/core/src/main/scala/kafka/server/metadata/ConfigRepository.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util.Properties
-
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.config.ConfigResource.Type
-
-trait ConfigRepository {
-  /**
-   * Return a copy of the topic configuration for the given topic.  Future 
changes will not be reflected.
-   *
-   * @param topicName the name of the topic for which the configuration will 
be returned
-   * @return a copy of the topic configuration for the given topic
-   */
-  def topicConfig(topicName: String): Properties = {
-    config(new ConfigResource(Type.TOPIC, topicName))
-  }
-
-  /**
-   * Return a copy of the broker configuration for the given broker.  Future 
changes will not be reflected.
-   *
-   * @param brokerId the id of the broker for which configuration will be 
returned
-   * @return a copy of the broker configuration for the given broker
-   */
-  def brokerConfig(brokerId: Int): Properties = {
-    config(new ConfigResource(Type.BROKER, brokerId.toString))
-  }
-
-  /**
-   * Return a copy of the group configuration for the given group.  Future 
changes will not be reflected.
-   *
-   * @param groupName the name of the group for which configuration will be 
returned
-   * @return a copy of the group configuration for the given group
-   */
-  def groupConfig(groupName: String): Properties = {
-    config(new ConfigResource(Type.GROUP, groupName))
-  }
-
-  /**
-   * Return a copy of the configuration for the given resource.  Future 
changes will not be reflected.
-   * @param configResource the resource for which the configuration will be 
returned
-   * @return a copy of the configuration for the given resource
-   */
-  def config(configResource: ConfigResource): Properties
-}
diff --git 
a/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala 
b/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala
deleted file mode 100644
index 27e4c1d8869..00000000000
--- a/core/src/test/scala/kafka/server/metadata/MockConfigRepository.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util
-import java.util.Properties
-
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
-
-object MockConfigRepository {
-  def forTopic(topic: String, key: String, value: String): 
MockConfigRepository = {
-    val properties = new Properties()
-    properties.put(key, value)
-    forTopic(topic, properties)
-  }
-
-  def forTopic(topic: String, properties: Properties): MockConfigRepository = {
-    val repository = new MockConfigRepository()
-    repository.configs.put(new ConfigResource(TOPIC, topic), properties)
-    repository
-  }
-}
-
-class MockConfigRepository extends ConfigRepository {
-  val configs = new util.HashMap[ConfigResource, Properties]()
-
-  override def config(configResource: ConfigResource): Properties = 
configs.synchronized {
-    configs.getOrDefault(configResource, new Properties())
-  }
-
-  def setConfig(configResource: ConfigResource, key: String, value: String): 
Unit = configs.synchronized {
-    val properties = configs.getOrDefault(configResource, new Properties())
-    val newProperties = new Properties()
-    newProperties.putAll(properties)
-    if (value == null) {
-      newProperties.remove(key)
-    } else {
-      newProperties.put(key, value)
-    }
-    configs.put(configResource, newProperties)
-  }
-
-  def setTopicConfig(topicName: String, key: String, value: String): Unit = 
configs.synchronized {
-    setConfig(new ConfigResource(TOPIC, topicName), key, value)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala 
b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
index 2cd271e1fe4..fb81da5bbb3 100644
--- a/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala
@@ -18,13 +18,13 @@ package kafka.cluster
 
 import kafka.log.LogManager
 import kafka.server.MetadataCache
-import kafka.server.metadata.MockConfigRepository
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.MockAlterPartitionManager
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.requests.LeaderAndIsrRequest
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.metadata.MockConfigRepository
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.util.MockTime
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala 
b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
index f3a949c22ec..e496f88449d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
@@ -23,7 +23,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.log._
 import kafka.server._
-import kafka.server.metadata.MockConfigRepository
 import kafka.utils._
 import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.config.TopicConfig
@@ -33,7 +32,7 @@ import org.apache.kafka.common.requests.{FetchRequest, 
LeaderAndIsrRequest}
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.metadata.{LeaderAndIsr, MockConfigRepository}
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.config.ReplicationConfigs
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index a2b49685b43..91e6c44a27c 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -18,7 +18,6 @@
 package kafka.log
 
 import kafka.server.KafkaConfig
-import kafka.server.metadata.MockConfigRepository
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.compress.Compression
@@ -27,6 +26,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.{ControlRecordType, DefaultRecordBatch, 
MemoryRecords, RecordBatch, SimpleRecord, TimestampType}
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.metadata.MockConfigRepository
 import org.apache.kafka.server.util.{MockTime, Scheduler}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
 import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
EpochEntry, LocalLog, LogConfig, LogDirFailureChannel, LogFileUtils, LogLoader, 
LogOffsetMetadata, LogSegment, LogSegments, LogStartOffsetIncrementReason, 
OffsetIndex, ProducerStateManager, ProducerStateManagerConfig, SnapshotFile}
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index f8595ede761..d5f9336a0f8 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,7 +18,6 @@
 package kafka.log
 
 import com.yammer.metrics.core.{Gauge, MetricName}
-import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
 import kafka.utils._
 import org.apache.directory.api.util.FileUtils
 import org.apache.kafka.common.config.TopicConfig
@@ -27,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{DirectoryId, KafkaException, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.image.{TopicImage, TopicsImage}
-import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
+import org.apache.kafka.metadata.{ConfigRepository, LeaderRecoveryState, 
MockConfigRepository, PartitionRegistration}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
 import org.apache.kafka.server.common.MetadataVersion
 import org.junit.jupiter.api.Assertions._
diff --git a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
index a4494c5f1e7..6d865219322 100644
--- a/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConfigAdminManagerTest.scala
@@ -20,7 +20,6 @@ package kafka.server
 import java.util
 import java.util.Collections
 
-import kafka.server.metadata.MockConfigRepository
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
BROKER_LOGGER, TOPIC, UNKNOWN}
@@ -39,6 +38,7 @@ import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.{Alte
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, NONE}
 import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.metadata.MockConfigRepository
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.{Assertions, Test}
 import org.slf4j.LoggerFactory
diff --git 
a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index f625afa1fa7..5465c67e5e1 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -24,9 +24,9 @@ import org.junit.jupiter.api._
 import org.junit.jupiter.api.Assertions._
 import kafka.utils.TestUtils
 import kafka.cluster.Partition
-import kafka.server.metadata.MockConfigRepository
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.SimpleRecord
+import org.apache.kafka.metadata.MockConfigRepository
 import org.apache.kafka.server.common.KRaftVersion
 import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
 import org.apache.kafka.storage.internals.log.{CleanerConfig, 
LogDirFailureChannel}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 11ebee134b8..8169705fc11 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -22,7 +22,7 @@ import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinat
 import kafka.log.UnifiedLog
 import kafka.network.RequestChannel
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ConfigRepository, KRaftMetadataCache, 
MockConfigRepository}
+import kafka.server.metadata.KRaftMetadataCache
 import kafka.server.share.SharePartitionManager
 import kafka.utils.{CoreUtils, Log4jController, Logging, TestUtils}
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -76,6 +76,7 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorTestConfig}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.metadata.{ConfigRepository, MockConfigRepository}
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AclEntry
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
index 09c9a82446a..3ba92d7c1ae 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerConcurrencyTest.scala
@@ -22,7 +22,6 @@ import java.util.concurrent.{CompletableFuture, Executors, 
LinkedBlockingQueue,
 import java.util.{Optional, Properties}
 import kafka.server.QuotaFactory.QuotaManagers
 import kafka.server.metadata.KRaftMetadataCache
-import kafka.server.metadata.MockConfigRepository
 import kafka.utils.TestUtils.waitUntilTrue
 import kafka.utils.{CoreUtils, Logging, TestUtils}
 import org.apache.kafka.common.metadata.RegisterBrokerRecord
@@ -36,7 +35,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
+import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, 
MockConfigRepository}
 import org.apache.kafka.metadata.PartitionRegistration
 import org.apache.kafka.metadata.storage.Formatter
 import org.apache.kafka.raft.QuorumConfig
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala 
b/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala
deleted file mode 100644
index fbeccc1e646..00000000000
--- 
a/core/src/test/scala/unit/kafka/server/metadata/MockConfigRepositoryTest.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import java.util.Properties
-
-import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
-
-class MockConfigRepositoryTest {
-  @Test
-  def testEmptyRepository(): Unit = {
-    val repository = new MockConfigRepository()
-    assertEquals(new Properties(), repository.brokerConfig(0))
-    assertEquals(new Properties(), repository.topicConfig("foo"))
-    assertEquals(new Properties(), repository.groupConfig("group"))
-  }
-
-  @Test
-  def testSetTopicConfig(): Unit = {
-    val repository = new MockConfigRepository()
-    val topic0 = "topic0"
-    repository.setTopicConfig(topic0, "foo", null)
-
-    val topic1 = "topic1"
-    repository.setTopicConfig(topic1, "foo", "bar")
-    val topicProperties = new Properties()
-    topicProperties.put("foo", "bar")
-    assertEquals(topicProperties, repository.topicConfig(topic1))
-
-    val topicProperties2 = new Properties()
-    topicProperties2.put("foo", "bar")
-    topicProperties2.put("foo2", "baz")
-    repository.setTopicConfig(topic1, "foo2", "baz") // add another prop
-    assertEquals(topicProperties2, repository.topicConfig(topic1)) // should 
get both props
-
-    repository.setTopicConfig(topic1, "foo2", null)
-    assertEquals(topicProperties, repository.topicConfig(topic1))
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c9331a70c53..83e994d5f88 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -21,7 +21,6 @@ import kafka.log._
 import kafka.network.RequestChannel
 import kafka.security.JaasTestUtils
 import kafka.server._
-import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
@@ -47,7 +46,7 @@ import org.apache.kafka.common.serialization._
 import org.apache.kafka.common.utils.Utils.formatAddress
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.metadata.LeaderAndIsr
+import org.apache.kafka.metadata.{ConfigRepository, LeaderAndIsr, 
MockConfigRepository}
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.raft.QuorumConfig
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 5c08768aefd..54b9764d3fa 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -32,7 +32,6 @@ import kafka.server.ReplicaManager;
 import kafka.server.ReplicaQuota;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
 import kafka.utils.Pool;
 import kafka.utils.TestUtils;
 
@@ -54,6 +53,7 @@ import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.OffsetAndEpoch;
 import org.apache.kafka.server.network.BrokerEndPoint;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index d58b35942fe..2018828ce51 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -34,7 +34,6 @@ import kafka.server.ReplicationQuotaManager;
 import kafka.server.SimpleApiVersionManager;
 import kafka.server.builders.KafkaApisBuilder;
 import kafka.server.metadata.KRaftMetadataCache;
-import kafka.server.metadata.MockConfigRepository;
 import kafka.server.share.SharePartitionManager;
 
 import org.apache.kafka.common.Uuid;
@@ -56,6 +55,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
 import org.apache.kafka.raft.QuorumConfig;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
index 4194927504a..32ff36c3c06 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java
@@ -24,7 +24,6 @@ import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
 import kafka.server.MetadataCache;
 import kafka.server.builders.LogManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
@@ -34,6 +33,7 @@ import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
index 47fefcbaad0..364cd6da399 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java
@@ -25,12 +25,12 @@ import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
 import kafka.server.MetadataCache;
 import kafka.server.builders.LogManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index ddea968215e..e8f4c2c686d 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -24,13 +24,13 @@ import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.MockConfigRepository;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.util.KafkaScheduler;
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index 0f5ac6d36b9..3c5aef892ae 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -25,8 +25,6 @@ import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.builders.LogManagerBuilder;
 import kafka.server.builders.ReplicaManagerBuilder;
-import kafka.server.metadata.ConfigRepository;
-import kafka.server.metadata.MockConfigRepository;
 import kafka.utils.TestUtils;
 
 import org.apache.kafka.common.TopicPartition;
@@ -35,6 +33,8 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.LeaderAndIsrRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.ConfigRepository;
+import org.apache.kafka.metadata.MockConfigRepository;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.util.KafkaScheduler;
 import org.apache.kafka.server.util.Scheduler;
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/ConfigRepository.java 
b/metadata/src/main/java/org/apache/kafka/metadata/ConfigRepository.java
new file mode 100644
index 00000000000..ddbdf9ddbc7
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ConfigRepository.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.ConfigResource.Type;
+
+import java.util.Properties;
+
+public interface ConfigRepository {
+
+    /**
+     * Return a copy of the topic configuration for the given topic. Future 
changes will not be reflected.
+     *
+     * @param topicName the name of the topic for which the configuration will 
be returned
+     * @return a copy of the topic configuration for the given topic
+     */
+    default Properties topicConfig(String topicName) {
+        return config(new ConfigResource(Type.TOPIC, topicName));
+    }
+
+    /**
+     * Return a copy of the broker configuration for the given broker. Future 
changes will not be reflected.
+     *
+     * @param brokerId the id of the broker for which configuration will be 
returned
+     * @return a copy of the broker configuration for the given broker
+     */
+    default Properties brokerConfig(int brokerId) {
+        return config(new ConfigResource(Type.BROKER, 
Integer.toString(brokerId)));
+    }
+
+    /**
+     * Return a copy of the group configuration for the given group. Future 
changes will not be reflected.
+     *
+     * @param groupName the name of the group for which configuration will be 
returned
+     * @return a copy of the group configuration for the given group
+     */
+    default Properties groupConfig(String groupName) {
+        return config(new ConfigResource(Type.GROUP, groupName));
+    }
+
+    /**
+     * Return a copy of the configuration for the given resource. Future 
changes will not be reflected.
+     *
+     * @param configResource the resource for which the configuration will be 
returned
+     * @return a copy of the configuration for the given resource
+     */
+    Properties config(ConfigResource configResource);
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepository.java 
b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepository.java
new file mode 100644
index 00000000000..a8ab01366cc
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepository.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.ConfigResource.Type;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class MockConfigRepository implements ConfigRepository {
+    private final Map<ConfigResource, Properties> configs = new HashMap<>();
+
+    public static MockConfigRepository forTopic(String topic, String key, 
String value) {
+        Properties properties = new Properties();
+        properties.put(key, value);
+        return forTopic(topic, properties);
+    }
+
+    public static MockConfigRepository forTopic(String topic, Properties 
properties) {
+        MockConfigRepository repository = new MockConfigRepository();
+        repository.configs.put(new ConfigResource(Type.TOPIC, topic), 
properties);
+        return repository;
+    }
+
+    @Override
+    public Properties config(ConfigResource configResource) {
+        synchronized (configs) {
+            return configs.getOrDefault(configResource, new Properties());
+        }
+    }
+
+    public void setConfig(ConfigResource configResource, String key, String 
value) {
+        synchronized (configs) {
+            Properties properties = configs.getOrDefault(configResource, new 
Properties());
+            Properties newProperties = new Properties();
+            newProperties.putAll(properties);
+            newProperties.compute(key, (k, v) -> value);
+            configs.put(configResource, newProperties);
+        }
+    }
+
+    public void setTopicConfig(String topicName, String key, String value) {
+        synchronized (configs) {
+            setConfig(new ConfigResource(Type.TOPIC, topicName), key, value);
+        }
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepositoryTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepositoryTest.java
new file mode 100644
index 00000000000..a90b2bb3340
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/MockConfigRepositoryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MockConfigRepositoryTest {
+
+    @Test
+    public void testEmptyRepository() {
+        MockConfigRepository repository = new MockConfigRepository();
+        assertEquals(new Properties(), repository.brokerConfig(0));
+        assertEquals(new Properties(), repository.topicConfig("foo"));
+        assertEquals(new Properties(), repository.groupConfig("group"));
+    }
+
+    @Test
+    public void testSetTopicConfig() {
+        MockConfigRepository repository = new MockConfigRepository();
+        String topic0 = "topic0";
+        repository.setTopicConfig(topic0, "foo", null);
+
+        String topic1 = "topic1";
+        repository.setTopicConfig(topic1, "foo", "bar");
+        Properties topicProperties = new Properties();
+        topicProperties.put("foo", "bar");
+        assertEquals(topicProperties, repository.topicConfig(topic1));
+
+        Properties topicProperties2 = new Properties();
+        topicProperties2.put("foo", "bar");
+        topicProperties2.put("foo2", "baz");
+        repository.setTopicConfig(topic1, "foo2", "baz"); // add another prop
+        assertEquals(topicProperties2, repository.topicConfig(topic1)); // 
should get both props
+
+        repository.setTopicConfig(topic1, "foo2", null);
+        assertEquals(topicProperties, repository.topicConfig(topic1));
+    }
+}


Reply via email to