This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdbc9a8d88c KAFKA-15083: add config with "remote.log.metadata" prefix
(#14151)
cdbc9a8d88c is described below
commit cdbc9a8d88c1ddc9dd088a33d047783a5b13c282
Author: Luke Chen <[email protected]>
AuthorDate: Fri Aug 11 10:42:14 2023 +0800
KAFKA-15083: add config with "remote.log.metadata" prefix (#14151)
When configuring RLMM, the configs passed into configure method is the
RemoteLogManagerConfig. But in RemoteLogManagerConfig, there's no configs
related to remote.log.metadata.*, ex:
remote.log.metadata.topic.replication.factor. So, even if users have set the
config in broker, it'll never be applied.
This PR fixed the issue to allow users setting RLMM prefix:
remote.log.metadata.manager.impl.prefix (default is rlmm.config.), and then,
appending the desired remote.log.metadata.* configs, it'll pass into RLMM,
including remote.log.metadata.common.client./remote.log.metadata.producer./
remote.log.metadata.consumer. prefixes.
Ex:
# default value
# remote.log.storage.manager.impl.prefix=rsm.config.
# remote.log.metadata.manager.impl.prefix=rlmm.config.
rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.replication.factor=4
rsm.config.test=value
Reviewers: Christo Lolov <[email protected]>, Kamal Chandraprakash
<[email protected]>, Divij Vaidya <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 12 ++--
.../kafka/log/remote/RemoteLogManagerTest.java | 73 +++++++++++++++++++++-
.../log/remote/storage/RemoteLogManagerConfig.java | 11 ++--
3 files changed, 85 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index d07d60ce3a8..82c0da8b584 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -256,15 +256,17 @@ public class RemoteLogManager implements Closeable {
}
private void configureRLMM() {
- final Map<String, Object> rlmmProps = new
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
-
- rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
- rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
- rlmmProps.put("cluster.id", clusterId);
+ final Map<String, Object> rlmmProps = new HashMap<>();
endpoint.ifPresent(e -> {
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"bootstrap.servers", e.host() + ":" + e.port());
rlmmProps.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"security.protocol", e.securityProtocol().name);
});
+ // update the remoteLogMetadataProps here to override endpoint config
if any
+ rlmmProps.putAll(rlmConfig.remoteLogMetadataManagerProps());
+
+ rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
+ rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+ rlmmProps.put("cluster.id", clusterId);
remoteLogMetadataManager.configure(rlmmProps);
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index dedcea0e38d..e22caf6f593 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -93,6 +93,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
+import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX;
+import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -122,6 +127,17 @@ public class RemoteLogManagerTest {
int brokerId = 0;
String logDir = TestUtils.tempDirectory("kafka-").toString();
String clusterId = "dummyId";
+ String remoteLogStorageTestProp = "remote.log.storage.test";
+ String remoteLogStorageTestVal = "storage.test";
+ String remoteLogMetadataTestProp = "remote.log.metadata.test";
+ String remoteLogMetadataTestVal = "metadata.test";
+ String remoteLogMetadataCommonClientTestProp =
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "common.client.test";
+ String remoteLogMetadataCommonClientTestVal = "common.test";
+ String remoteLogMetadataProducerTestProp =
REMOTE_LOG_METADATA_PRODUCER_PREFIX + "producer.test";
+ String remoteLogMetadataProducerTestVal = "producer.test";
+ String remoteLogMetadataConsumerTestProp =
REMOTE_LOG_METADATA_CONSUMER_PREFIX + "consumer.test";
+ String remoteLogMetadataConsumerTestVal = "consumer.test";
+ String remoteLogMetadataTopicPartitionsNum = "1";
RemoteStorageManager remoteStorageManager =
mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager =
mock(RemoteLogMetadataManager.class);
@@ -248,16 +264,59 @@ public class RemoteLogManagerTest {
assertEquals(brokerId,
capture.getValue().get(KafkaConfig.BrokerIdProp()));
}
+ @Test
+ void testRemoteLogMetadataManagerWithEndpointConfigOverridden() throws
IOException {
+ Properties props = new Properties();
+ // override common security.protocol by adding "RLMM prefix" and
"remote log metadata common client prefix"
+ props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX +
REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL");
+ try (RemoteLogManager remoteLogManager = new
RemoteLogManager(createRLMConfig(props), brokerId, logDir, clusterId, time, tp
-> Optional.of(mockLog), brokerTopicStats) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ }) {
+
+ String host = "localhost";
+ String port = "1234";
+ String securityProtocol = "PLAINTEXT";
+ EndPoint endPoint = new EndPoint(host, Integer.parseInt(port), new
ListenerName(securityProtocol),
+ SecurityProtocol.PLAINTEXT);
+ remoteLogManager.onEndPointCreated(endPoint);
+ remoteLogManager.startup();
+
+ ArgumentCaptor<Map<String, Object>> capture =
ArgumentCaptor.forClass(Map.class);
+ verify(remoteLogMetadataManager,
times(1)).configure(capture.capture());
+ assertEquals(host + ":" + port,
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"bootstrap.servers"));
+ // should be overridden as SSL
+ assertEquals("SSL",
capture.getValue().get(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX +
"security.protocol"));
+ assertEquals(clusterId, capture.getValue().get("cluster.id"));
+ assertEquals(brokerId,
capture.getValue().get(KafkaConfig.BrokerIdProp()));
+ }
+ }
+
+
+
@Test
void testStartup() {
remoteLogManager.startup();
ArgumentCaptor<Map<String, Object>> capture =
ArgumentCaptor.forClass(Map.class);
verify(remoteStorageManager, times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
+ assertEquals(remoteLogStorageTestVal,
capture.getValue().get(remoteLogStorageTestProp));
verify(remoteLogMetadataManager,
times(1)).configure(capture.capture());
assertEquals(brokerId, capture.getValue().get("broker.id"));
assertEquals(logDir, capture.getValue().get("log.dir"));
+
+ // verify the configs starting with "remote.log.metadata",
"remote.log.metadata.common.client."
+ // "remote.log.metadata.producer.", and
"remote.log.metadata.consumer." are correctly passed in
+ assertEquals(remoteLogMetadataTopicPartitionsNum,
capture.getValue().get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP));
+ assertEquals(remoteLogMetadataTestVal,
capture.getValue().get(remoteLogMetadataTestProp));
+ assertEquals(remoteLogMetadataConsumerTestVal,
capture.getValue().get(remoteLogMetadataConsumerTestProp));
+ assertEquals(remoteLogMetadataProducerTestVal,
capture.getValue().get(remoteLogMetadataProducerTestProp));
+ assertEquals(remoteLogMetadataCommonClientTestVal,
capture.getValue().get(remoteLogMetadataCommonClientTestProp));
}
// This test creates 2 log segments, 1st one has start offset of 0, 2nd
one (and active one) has start offset of 150.
@@ -726,13 +785,15 @@ public class RemoteLogManagerTest {
@Test
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager =
mock(ClassLoaderAwareRemoteStorageManager.class);
- RemoteLogManager remoteLogManager =
+ try (RemoteLogManager remoteLogManager =
new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir,
clusterId, time, t -> Optional.empty(), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
};
- assertEquals(rsmManager, remoteLogManager.storageManager());
+ ) {
+ assertEquals(rsmManager, remoteLogManager.storageManager());
+ }
}
private void verifyInCache(TopicIdPartition... topicIdPartitions) {
@@ -1007,6 +1068,14 @@ public class RemoteLogManagerTest {
props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true);
props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
NoOpRemoteStorageManager.class.getName());
props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
NoOpRemoteLogMetadataManager.class.getName());
+ props.put(DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX +
remoteLogStorageTestProp, remoteLogStorageTestVal);
+ // adding configs with "remote log metadata manager config prefix"
+ props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX +
REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, remoteLogMetadataTopicPartitionsNum);
+ props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX +
remoteLogMetadataTestProp, remoteLogMetadataTestVal);
+ props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX +
remoteLogMetadataCommonClientTestProp, remoteLogMetadataCommonClientTestVal);
+ props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX +
remoteLogMetadataConsumerTestProp, remoteLogMetadataConsumerTestVal);
+ props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX +
remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
+
AbstractConfig config = new
AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props);
return new RemoteLogManagerConfig(config);
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 1167ee73c66..2d368daef19 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -42,7 +42,8 @@ public final class RemoteLogManagerConfig {
*/
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP =
"remote.log.storage.manager.impl.prefix";
public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC =
"Prefix used for properties to be passed to RemoteStorageManager " +
- "implementation. For example this value can be `rsm.s3.`.";
+ "implementation. For example this value can be `rsm.config.`.";
+ public static final String DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX =
"rsm.config.";
/**
* Prefix used for properties to be passed to {@link
RemoteLogMetadataManager} implementation. Remote log subsystem collects all the
properties having
@@ -50,7 +51,9 @@ public final class RemoteLogManagerConfig {
*/
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP
= "remote.log.metadata.manager.impl.prefix";
public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC =
"Prefix used for properties to be passed to RemoteLogMetadataManager " +
- "implementation. For example this value can be `rlmm.s3.`.";
+ "implementation. For example this value can be `rlmm.config.`.";
+ public static final String
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX = "rlmm.config.";
+
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP =
"remote.log.storage.system.enable";
public static final String REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC = "Whether
to enable tier storage functionality in a broker or not. Valid values " +
@@ -152,13 +155,13 @@ public final class RemoteLogManagerConfig {
REMOTE_LOG_STORAGE_SYSTEM_ENABLE_DOC)
.defineInternal(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP,
STRING,
- null,
+ DEFAULT_REMOTE_STORAGE_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC)
.defineInternal(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP,
STRING,
- null,
+
DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX,
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_DOC)