divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } + public void endPoint(Optional<EndPoint> endpoint) { + this.endpoint = endpoint; + } + private void configureRLMM() { final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); + endpoint.ifPresent(e -> { + rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); Review Comment: please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG` ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } + public void endPoint(Optional<EndPoint> endpoint) { + this.endpoint = endpoint; + } + private void configureRLMM() { final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); + endpoint.ifPresent(e -> { Review Comment: missing property `cluster.id` as per https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49 ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -280,7 +281,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) - remoteLogManager = createRemoteLogManager(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) + remoteLogManager = createRemoteLogManager(remoteLogManagerConfig) Review Comment: s/remoteLogManager/remoteLogManagerOpt ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -504,6 +506,13 @@ class KafkaServer( new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) + remoteLogManager.foreach(rlm => { + val listenerName = ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) + val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) + .orElse(Some(brokerInfo.broker.endPoints.head)) Review Comment: this means that endpoint will never be optional (since we are picking up the first broker endpoint when it's not configrued). Right? In that case, can we make it mandatory please in RemoteLogManager? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() { }); } + public void endPoint(Optional<EndPoint> endpoint) { + this.endpoint = endpoint; + } + private void configureRLMM() { final Map<String, Object> rlmmProps = new HashMap<>(rlmConfig.remoteLogMetadataManagerProps()); rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId); rlmmProps.put(KafkaConfig.LogDirProp(), logDir); + endpoint.ifPresent(e -> { + rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port()); + rlmmProps.put("security.protocol", e.securityProtocol().name); Review Comment: please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -280,7 +281,8 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) - remoteLogManager = createRemoteLogManager(config) + val remoteLogManagerConfig = new RemoteLogManagerConfig(config) Review Comment: should we create this only when `remoteLogManagerConfig.enableRemoteStorageSystem()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org