This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdbc9a8d88c KAFKA-15083: add config with "remote.log.metadata" prefix 
(#14151)
cdbc9a8d88c is described below

commit cdbc9a8d88c1ddc9dd088a33d047783a5b13c282
Author: Luke Chen <[email protected]>
AuthorDate: Fri Aug 11 10:42:14 2023 +0800

    KAFKA-15083: add config with "remote.log.metadata" prefix (#14151)
    
    When configuring RLMM, the configs passed into configure method is the 
RemoteLogManagerConfig. But in RemoteLogManagerConfig, there's no configs 
related to remote.log.metadata.*, ex: 
remote.log.metadata.topic.replication.factor. So, even if users have set the 
config in broker, it'll never be applied.
    
    This PR fixed the issue to allow users setting RLMM prefix: 
remote.log.metadata.manager.impl.prefix (default is rlmm.config.), and then, 
appending the desired remote.log.metadata.* configs, it'll pass into RLMM, 
including remote.log.metadata.common.client./remote.log.metadata.producer./ 
remote.log.metadata.consumer. prefixes.
    
    Ex:
    
    # default value
    # remote.log.storage.manager.impl.prefix=rsm.config.
    # remote.log.metadata.manager.impl.prefix=rlmm.config.
    
    rlmm.config.remote.log.metadata.topic.num.partitions=50
    rlmm.config.remote.log.metadata.topic.replication.factor=4
    
    rsm.config.test=value
    
    Reviewers: Christo Lolov <[email protected]>, Kamal Chandraprakash 
<[email protected]>, Divij Vaidya <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 12 ++--
 .../kafka/log/remote/RemoteLogManagerTest.java     | 73 +++++++++++++++++++++-
 .../log/remote/storage/RemoteLogManagerConfig.java | 11 ++--
 3 files changed, 85 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index d07d60ce3a8..82c0da8b584 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -256,15 +256,17 @@ public class RemoteLogManager implements Closeable {
     }
 
     private void configureRLMM() {
-        final Map<String, Object> rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
-
-        rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
-        rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
-        rlmmProps.put("cluster.id", clusterId);
+        final Map<String, Object> rlmmProps = new HashMap<>();
         endpoint.ifPresent(e -> {
             rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"bootstrap.servers", e.host() + ":" + e.port());
             rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"security.protocol", e.securityProtocol().name);
         });
+        // update the remoteLogMetadataProps here to override endpoint config 
if any
+        rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
+
+        rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
+        rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        rlmmProps.put("cluster.id", clusterId);
 
         remoteLogMetadataManager.configure(rlmmProps);
     }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index dedcea0e38d..e22caf6f593 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -93,6 +93,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 
 import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
 import static 
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -122,6 +127,17 @@ public class RemoteLogManagerTest {
     int brokerId = 0;
     String logDir = TestUtils.tempDirectory("kafka-").toString();
     String clusterId = "dummyId";
+    String remoteLogStorageTestProp = "remote.log.storage.test";
+    String remoteLogStorageTestVal = "storage.test";
+    String remoteLogMetadataTestProp = "remote.log.metadata.test";
+    String remoteLogMetadataTestVal = "metadata.test";
+    String remoteLogMetadataCommonClientTestProp = 
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
+    String remoteLogMetadataCommonClientTestVal = "common.test";
+    String remoteLogMetadataProducerTestProp = 
REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
+    String remoteLogMetadataProducerTestVal = "producer.test";
+    String remoteLogMetadataConsumerTestProp = 
REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
+    String remoteLogMetadataConsumerTestVal = "consumer.test";
+    String remoteLogMetadataTopicPartitionsNum = "1";
 
     RemoteStorageManager remoteStorageManager = 
mock(RemoteStorageManager.class);
     RemoteLogMetadataManager remoteLogMetadataManager = 
mock(RemoteLogMetadataManager.class);
@@ -248,16 +264,59 @@ public class RemoteLogManagerTest {
         assertEquals(brokerId, 
capture.getValue().get(KafkaConfig.BrokerIdProp()));
     }
 
+    @Test
+    void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws 
IOException {
+        Properties props = new Properties();
+        // override common security.protocol by adding "RLMM prefix" and 
"remote log metadata common client prefix"
+        props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
+        try (RemoteLogManager remoteLogManager = new 
RemoteLogManager(createRLMConfig(props), brokerId, logDir, clusterId, time, tp 
-> Optional.of(mockLog), brokerTopicStats) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+        }) {
+
+            String host = "localhost";
+            String port = "1234";
+            String securityProtocol = "PLAINTEXT";
+            EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new 
ListenerName(securityProtocol),
+                    SecurityProtocol.PLAINTEXT);
+            remoteLogManager.onEndPointCreated(endPoint);
+            remoteLogManager.startup();
+
+            ArgumentCaptor<Map<String, Object>> capture = 
ArgumentCaptor.forClass(Map.class);
+            verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
+            assertEquals(host + ":" + port, 
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"bootstrap.servers"));
+            // should be overridden as SSL
+            assertEquals("SSL", 
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + 
"security.protocol"));
+            assertEquals(clusterId, capture.getValue().get("cluster.id"));
+            assertEquals(brokerId, 
capture.getValue().get(KafkaConfig.BrokerIdProp()));
+        }
+    }
+
+
+
     @Test
     void testStartup() {
         remoteLogManager.startup();
         ArgumentCaptor<Map<String, Object>> capture = 
ArgumentCaptor.forClass(Map.class);
         verify(remoteStorageManager, times(1)).configure(capture.capture());
         assertEquals(brokerId, capture.getValue().get("broker.id"));
+        assertEquals(remoteLogStorageTestVal, 
capture.getValue().get(remoteLogStorageTestProp));
 
         verify(remoteLogMetadataManager, 
times(1)).configure(capture.capture());
         assertEquals(brokerId, capture.getValue().get("broker.id"));
         assertEquals(logDir, capture.getValue().get("log.dir"));
+
+        // verify the configs starting with "remote.log.metadata", 
"remote.log.metadata.common.client."
+        // "remote.log.metadata.producer.", and 
"remote.log.metadata.consumer." are correctly passed in
+        assertEquals(remoteLogMetadataTopicPartitionsNum, 
capture.getValue().get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP));
+        assertEquals(remoteLogMetadataTestVal, 
capture.getValue().get(remoteLogMetadataTestProp));
+        assertEquals(remoteLogMetadataConsumerTestVal, 
capture.getValue().get(remoteLogMetadataConsumerTestProp));
+        assertEquals(remoteLogMetadataProducerTestVal, 
capture.getValue().get(remoteLogMetadataProducerTestProp));
+        assertEquals(remoteLogMetadataCommonClientTestVal, 
capture.getValue().get(remoteLogMetadataCommonClientTestProp));
     }
 
     // This test creates 2 log segments, 1st one has start offset of 0, 2nd 
one (and active one) has start offset of 150.
@@ -726,13 +785,15 @@ public class RemoteLogManagerTest {
     @Test
     void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
         ClassLoaderAwareRemoteStorageManager rsmManager = 
mock(ClassLoaderAwareRemoteStorageManager.class);
-        RemoteLogManager remoteLogManager =
+        try (RemoteLogManager remoteLogManager =
             new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, 
clusterId, time, t -> Optional.empty(), brokerTopicStats) {
                 public RemoteStorageManager createRemoteStorageManager() {
                     return rsmManager;
                 }
             };
-        assertEquals(rsmManager, remoteLogManager.storageManager());
+        ) {
+            assertEquals(rsmManager, remoteLogManager.storageManager());
+        }
     }
 
     private void verifyInCache(TopicIdPartition... topicIdPartitions) {
@@ -1007,6 +1068,14 @@ public class RemoteLogManagerTest {
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
         
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteStorageManager.class.getName());
         
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, 
NoOpRemoteLogMetadataManager.class.getName());
+        props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX + 
remoteLogStorageTestProp, remoteLogStorageTestVal);
+        // adding configs with "remote log metadata manager config prefix"
+        props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, remoteLogMetadataTopicPartitionsNum);
+        props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataTestProp, remoteLogMetadataTestVal);
+        props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
+        props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
+        props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + 
remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
+
         AbstractConfig config = new 
AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
         return new RemoteLogManagerConfig(config);
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1167ee73c66..2d368daef19 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -42,7 +42,8 @@ public final class RemoteLogManagerConfig {
      */
     public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = 
"remote.log.storage.manager.impl.prefix";
     public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteStorageManager " +
-            "implementation. For example this value can be `rsm.s3.`.";
+            "implementation. For example this value can be `rsm.config.`.";
+    public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX = 
"rsm.config.";
 
     /**
      * Prefix used for properties to be passed to {@link 
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the 
properties having
@@ -50,7 +51,9 @@ public final class RemoteLogManagerConfig {
      */
     public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP 
= "remote.log.metadata.manager.impl.prefix";
     public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC = 
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
-            "implementation. For example this value can be `rlmm.s3.`.";
+            "implementation. For example this value can be `rlmm.config.`.";
+    public static final String 
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";
+
 
     public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP = 
"remote.log.storage.system.enable";
     public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether 
to enable tier storage functionality in a broker or not. Valid values " +
@@ -152,13 +155,13 @@ public final class RemoteLogManagerConfig {
                                   REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
                   .defineInternal(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
                                   STRING,
-                                  null,
+                                  DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
                                   new ConfigDef.NonEmptyString(),
                                   MEDIUM,
                                   REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
                   
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
                                   STRING,
-                                  null,
+                                  
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
                                   new ConfigDef.NonEmptyString(),
                                   MEDIUM,
                                   
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)

Reply via email to