divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
         });
     }
 
+    public void endPoint(Optional<EndPoint> endpoint) {
+        this.endpoint = endpoint;
+    }
+
     private void configureRLMM() {
         final Map<String, Object> rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        endpoint.ifPresent(e -> {
+            rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());

Review Comment:
   please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG`



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
         });
     }
 
+    public void endPoint(Optional<EndPoint> endpoint) {
+        this.endpoint = endpoint;
+    }
+
     private void configureRLMM() {
         final Map<String, Object> rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        endpoint.ifPresent(e -> {

Review Comment:
   missing property `cluster.id` as per 
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -280,7 +281,8 @@ class KafkaServer(
         _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
-        remoteLogManager = createRemoteLogManager(config)
+        val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+        remoteLogManager = createRemoteLogManager(remoteLogManagerConfig)

Review Comment:
   s/remoteLogManager/remoteLogManagerOpt



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -504,6 +506,13 @@ class KafkaServer(
           new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
+        remoteLogManager.foreach(rlm => {
+          val listenerName = 
ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+          val endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
+            .orElse(Some(brokerInfo.broker.endPoints.head))

Review Comment:
   this means that endpoint will never be optional (since we are picking up the 
first broker endpoint when it's not configrued). Right? In that case, can we 
make it mandatory please in RemoteLogManager?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
         });
     }
 
+    public void endPoint(Optional<EndPoint> endpoint) {
+        this.endpoint = endpoint;
+    }
+
     private void configureRLMM() {
         final Map<String, Object> rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
         rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
         rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+        endpoint.ifPresent(e -> {
+            rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
+            rlmmProps.put("security.protocol", e.securityProtocol().name);

Review Comment:
   please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -280,7 +281,8 @@ class KafkaServer(
         _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
-        remoteLogManager = createRemoteLogManager(config)
+        val remoteLogManagerConfig = new RemoteLogManagerConfig(config)

Review Comment:
   should we create this only when 
`remoteLogManagerConfig.enableRemoteStorageSystem()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to