[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-12 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1226373376


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -474,7 +474,14 @@ class BrokerServer(
   new KafkaConfig(config.originals(), true)
 
   // Start RemoteLogManager before broker start serving the requests.
-  remoteLogManager.foreach(_.startup())
+  remoteLogManagerOpt.foreach(rlm => {
+val listenerName = 
ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+endpoints.stream.filter(e => e.listenerName.equals(listenerName))
+  .findAny()

Review Comment:
   I would prefer to have findFirst here to keep the logic of choosing 
endpoints deterministic in the code. For example, in situations where we have 
multiple endpoints for the same listener name, this code might choose any 
random one (based on streams implementation) and if only one of the endpoints 
is incorrect, it will "sometimes" fail and sometimes it won't. 
   Making it deterministic provides us with consistent experience.
   
   (same for KafkaServer)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-09 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1224171314


##
clients/src/main/java/org/apache/kafka/common/network/ListenerName.java:
##
@@ -36,6 +36,7 @@ public static ListenerName 
forSecurityProtocol(SecurityProtocol securityProtocol
  * Create an instance with the provided value converted to uppercase.
  */
 public static ListenerName normalised(String value) {
+Objects.requireNonNull(value);

Review Comment:
   nit
   do we want to check for empty and whitespaces, using `Utils.isBlank()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-09 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1224064865


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -136,7 +137,10 @@ public class RemoteLogManager implements Closeable {
 
 // topic ids that are received on leadership changes, this map is cleared 
on stop partitions
 private final ConcurrentMap topicPartitionIds = new 
ConcurrentHashMap<>();
+private final String clusterId;
 
+// The endpoint for remote log metadata manager to connect to
+private Optional endpoint = Optional.empty();

Review Comment:
   This shouldn't be optional as discussed in prior comments. Our code logic 
enforces that this is always present (else we throw an error) at
   ```
   endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
   .getOrElse(throw new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
 +
 " should be set as a listener name within valid broker 
listener name list."))
   ```



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -505,7 +507,14 @@ class KafkaServer(
 KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
 // Start RemoteLogManager before broker start serving the requests.
-remoteLogManager.foreach(_.startup())
+remoteLogManagerOpt.foreach(rlm => {
+  val listenerName = 
ListenerName.normalised(config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+  val endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
+.getOrElse(throw new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
 +
+  " should be set as a listener name within valid broker listener 
name list."))
+  rlm.endPoint(Optional.of(endpoint))

Review Comment:
   endpoint is guaranteed to be present (else we throw an error). We can remove 
the optional here.



##
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##
@@ -189,6 +192,36 @@ void testRemoteLogMetadataManagerWithUserDefinedConfigs() {
 assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"));
 }
 
+@Test

Review Comment:
   perhaps we can add a test to verify that an exception is thrown when 
endpoint is not defined?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-09 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1224063364


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {
+rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1223142144


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -197,7 +199,8 @@ class BrokerServer(
   logManager = LogManager(config, initialOfflineDirs, metadataCache, 
kafkaScheduler, time,
 brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = 
true)
 
-  remoteLogManager = createRemoteLogManager(config)
+  val remoteLogManagerConfig = new RemoteLogManagerConfig(config)

Review Comment:
   Correct me if I am wrong here but we already have RemoteLogManagerConfig. It 
is used here:
   
   
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/core/src/main/scala/kafka/log/LogManager.scala#L1405
   
   Hence, we can simply do 
`createRemoteLogManager(config.remoteLogManagerConfig)` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13828: KAFKA-15066: add "remote.log.metadata.manager.listener.name" config to rlmm

2023-06-08 Thread via GitHub


divijvaidya commented on code in PR #13828:
URL: https://github.com/apache/kafka/pull/13828#discussion_r1223088072


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {
+rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());

Review Comment:
   please use the constant `CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG`



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {

Review Comment:
   missing property `cluster.id` as per 
https://github.com/apache/kafka/blob/513e1c641d63c5e15144f9fcdafa1b56c5e5ba09/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java#L49



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -280,7 +281,8 @@ class KafkaServer(
 _brokerState = BrokerState.RECOVERY
 logManager.startup(zkClient.getAllTopicsInCluster())
 
-remoteLogManager = createRemoteLogManager(config)
+val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+remoteLogManager = createRemoteLogManager(remoteLogManagerConfig)

Review Comment:
   s/remoteLogManager/remoteLogManagerOpt



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -504,6 +506,13 @@ class KafkaServer(
   new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
 KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
+remoteLogManager.foreach(rlm => {
+  val listenerName = 
ListenerName.normalised(remoteLogManagerConfig.remoteLogMetadataManagerListenerName())
+  val endpoint = brokerInfo.broker.endPoints.find(e => 
e.listenerName.equals(listenerName))
+.orElse(Some(brokerInfo.broker.endPoints.head))

Review Comment:
   this means that endpoint will never be optional (since we are picking up the 
first broker endpoint when it's not configrued). Right? In that case, can we 
make it mandatory please in RemoteLogManager?



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -220,11 +223,20 @@ public RemoteLogMetadataManager run() {
 });
 }
 
+public void endPoint(Optional endpoint) {
+this.endpoint = endpoint;
+}
+
 private void configureRLMM() {
 final Map rlmmProps = new 
HashMap<>(rlmConfig.remoteLogMetadataManagerProps());
 
 rlmmProps.put(KafkaConfig.BrokerIdProp(), brokerId);
 rlmmProps.put(KafkaConfig.LogDirProp(), logDir);
+endpoint.ifPresent(e -> {
+rlmmProps.put("bootstrap.servers", e.host() + ":" + e.port());
+rlmmProps.put("security.protocol", e.securityProtocol().name);

Review Comment:
   please use the constant CommonClientConfigs.SECURITY_PROTOCOL_CONFIG



##
core/src/main/scala/kafka/server/KafkaServer.scala:
##
@@ -280,7 +281,8 @@ class KafkaServer(
 _brokerState = BrokerState.RECOVERY
 logManager.startup(zkClient.getAllTopicsInCluster())
 
-remoteLogManager = createRemoteLogManager(config)
+val remoteLogManagerConfig = new RemoteLogManagerConfig(config)

Review Comment:
   should we create this only when 
`remoteLogManagerConfig.enableRemoteStorageSystem()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org