[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { +rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); Review Comment: please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG` ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { Review Comment: missing property `cluster.id` as per https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49 ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -280,7 +281,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) -remoteLogManager = createRemoteLogManager(config) +val remoteLogManagerConfig = new RemoteLogManagerConfig(config) +remoteLogManager = createRemoteLogManager(remoteLogManagerConfig) Review Comment: s/remoteLogManager/remoteLogManagerOpt ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -504,6 +506,13 @@ class KafkaServer( new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) +remoteLogManager.foreach(rlm => { + val listenerName = ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) + val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) +.orElse(Some(brokerInfo.broker.endPoints.head)) Review Comment: this means that endpoint will never be optional (since we are picking up the first broker endpoint when it's not configrued). Right? In that case, can we make it mandatory please in RemoteLogManager? ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { +rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); +rlmmProps.put("security.protocol", e.securityProtocol().name); Review Comment: please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -280,7 +281,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) -remoteLogManager = createRemoteLogManager(config) +val remoteLogManagerConfig = new RemoteLogManagerConfig(config) Review Comment: should we create this only when `remoteLogManagerConfig.enableRemoteStorageSystem()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1223142144 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -197,7 +199,8 @@ class BrokerServer( logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true) - remoteLogManager = createRemoteLogManager(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) Review Comment: Correct me if I am wrong here but we already have RemoteLogManagerConfig. It is used here: https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/log/LogManager.scala#L1405 Hence, we can simply do `createRemoteLogManager(config.remoteLogManagerConfig)` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1224063364 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } +public void endPoint(Optional endpoint) { +this.endpoint = endpoint; +} + private void configureRLMM() { final Map rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); +endpoint.ifPresent(e -> { +rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1224064865 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -136,7 +137,10 @@ public class RemoteLogManager implements Closeable { // topic ids that are received on leadership changes, this map is cleared on stop partitions private final ConcurrentMap topicPartitionIds = new ConcurrentHashMap<>(); +private final String clusterId; +// The endpoint for remote log metadata manager to connect to +private Optional endpoint = Optional.empty(); Review Comment: This shouldn't be optional as discussed in prior comments. Our code logic enforces that this is always present (else we throw an error) at ``` endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) .getOrElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP + " should be set as a listener name within valid broker listener name list.")) ``` ## core/src/main/scala/kafka/server/KafkaServer.scala: ## @@ -505,7 +507,14 @@ class KafkaServer( KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) // Start RemoteLogManager before broker start serving the requests. -remoteLogManager.foreach(_.startup()) +remoteLogManagerOpt.foreach(rlm => { + val listenerName = ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) + val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) +.getOrElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP + + " should be set as a listener name within valid broker listener name list.")) + rlm.endPoint(Optional.of(endpoint)) Review Comment: endpoint is guaranteed to be present (else we throw an error). We can remove the optional here. ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -189,6 +192,36 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() { assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y")); } +@Test Review Comment: perhaps we can add a test to verify that an exception is thrown when endpoint is not defined? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1224171314 ## clients/src/main/java/org/apache/kafka/common/network/ListenerName.java: ## @@ -36,6 +36,7 @@ public static ListenerName forSecurityProtocol(SecurityProtocol securityProtocol * Create an instance with the provided value converted to uppercase. */ public static ListenerName normalised(String value) { +Objects.requireNonNull(value); Review Comment: nit do we want to check for empty and whitespaces, using `Utils.isBlank()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm
divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1226373376 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -474,7 +474,14 @@ class BrokerServer( new KafkaConfig(config.originals(), true) // Start RemoteLogManager before broker start serving the requests. - remoteLogManager.foreach(_.startup()) + remoteLogManagerOpt.foreach(rlm => { +val listenerName = ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) +endpoints.stream.filter(e => e.listenerName.equals(listenerName)) + .findAny() Review Comment: I would prefer to have findFirst here to keep the logic of choosing endpoints deterministic in the code. For example, in situations where we have multiple endpoints for the same listener name, this code might choose any random one (based on streams implementation) and if only one of the endpoints is incorrect, it will "sometimes" fail and sometimes it won't. Making it deterministic provides us with consistent experience. (same for KafkaServer) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org