divijvaidya commented on code in PR #13828: URL: https://github.com/apache/kafka/pull/13828#discussion_r1224064865
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -136,7 +137,10 @@ public class RemoteLogManager implements Closeable { // topic ids that are received on leadership changes, this map is cleared on stop partitions private final ConcurrentMap<TopicPartition, Uuid> topicPartitionIds = new ConcurrentHashMap<>(); + private final String clusterId; + // The endpoint for remote log metadata manager to connect to + private Optional<EndPoint> endpoint = Optional.empty(); Review Comment: This shouldn't be optional as discussed in prior comments. Our code logic enforces that this is always present (else we throw an error) at ``` endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) .getOrElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP + " should be set as a listener name within valid broker listener name list.")) ``` ########## core/src/main/scala/kafka/server/KafkaServer.scala: ########## @@ -505,7 +507,14 @@ class KafkaServer( KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) // Start RemoteLogManager before broker start serving the requests. - remoteLogManager.foreach(_.startup()) + remoteLogManagerOpt.foreach(rlm => { + val listenerName = ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()) + val endpoint = brokerInfo.broker.endPoints.find(e => e.listenerName.equals(listenerName)) + .getOrElse(throw new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP + + " should be set as a listener name within valid broker listener name list.")) + rlm.endPoint(Optional.of(endpoint)) Review Comment: endpoint is guaranteed to be present (else we throw an error). We can remove the optional here. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -189,6 +192,36 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() { assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y")); } + @Test Review Comment: perhaps we can add a test to verify that an exception is thrown when endpoint is not defined? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org