divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1224064865


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -136,7 +137,10 @@ public class RemoteLogManager implements Closeable {
 
     // topic ids that are received on leadership changes, this map is cleared 
on stop partitions
     private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new 
ConcurrentHashMap<>();
+    private final String clusterId;
 
+    // The endpoint for remote log metadata manager to connect to
+    private Optional<EndPoint> endpoint = Optional.empty();

Review Comment:
   This shouldn't be optional as discussed in prior comments. Our code logic 
enforces that this is always present (else we throw an error) at
   ```
   endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
               .getOrElse(throw new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
 +
                 " should be set as a listener name within valid broker 
listener name list."))
   ```



##########
core/src/main/scala/kafka/server/KafkaServer.scala:
##########
@@ -505,7 +507,14 @@ class KafkaServer(
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         // Start RemoteLogManager before broker start serving the requests.
-        remoteLogManager.foreach(_.startup())
+        remoteLogManagerOpt.foreach(rlm => {
+          val listenerName = 
ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+          val endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
+            .getOrElse(throw new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
 +
+              " should be set as a listener name within valid broker listener 
name list."))
+          rlm.endPoint(Optional.of(endpoint))

Review Comment:
   endpoint is guaranteed to be present (else we throw an error). We can remove 
the optional here.



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -189,6 +192,36 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
         assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
     }
 
+    @Test

Review Comment:
   perhaps we can add a test to verify that an exception is thrown when 
endpoint is not defined?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to