This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new feee616f738 MINOR: Query before creating the internal remote log 
metadata topic (#14755)
feee616f738 is described below

commit feee616f738a059d6de3e5ec32fba6e9c2b23fb9
Author: Kamal Chandraprakash <kchandraprak...@uber.com>
AuthorDate: Mon Nov 20 14:50:11 2023 +0530

    MINOR: Query before creating the internal remote log metadata topic (#14755)
    
    When a node starts (or) restarts, then we send a CREATE_TOPICS request to 
the controller to create the internal __remote_log_metadata topic.
    
    Topic creation event is costly and handled by the controller. During 
re-balance, the controller can have pending requests in its queue and can lead 
to CREATE_TOPICS timeout. Instead of firing the CREATE_TOPICS request when a 
node restarts, send a METADATA request (topic describe) which is handled by the 
least loaded node before sending a request to create the topic.
    
    Reviewers: Satish Duggana <sati...@apache.org>, Christo Lolov 
<lol...@amazon.com>
---
 .../TopicBasedRemoteLogMetadataManager.java        | 53 ++++++++++++++++++----
 .../TopicBasedRemoteLogMetadataManagerTest.java    | 25 ++++++++++
 2 files changed, 68 insertions(+), 10 deletions(-)

diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 9c6a2089db4..938238fae73 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -45,6 +46,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -54,6 +56,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 /**
  * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
@@ -446,6 +449,20 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         }
     }
 
+    boolean doesTopicExist(Admin adminClient, String topic) {
+        try {
+            TopicDescription description = 
adminClient.describeTopics(Collections.singleton(topic))
+                    .topicNameValues()
+                    .get(topic)
+                    .get();
+            log.info("Topic {} exists. Description: {}", topic, description);
+            return description != null;
+        } catch (ExecutionException | InterruptedException ex) {
+            log.info("Topic {} does not exist. Error: {}", topic, 
ex.getCause().getMessage());
+            return false;
+        }
+    }
+
     private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
                                                       String topicName) throws 
InterruptedException, ExecutionException {
         log.debug("Getting topic details to check for partition count and 
replication factor.");
@@ -467,30 +484,46 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
         Map<String, String> topicConfigs = new HashMap<>();
         topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, 
Long.toString(rlmmConfig.metadataTopicRetentionMs()));
         topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
+        topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
"false");
         return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
                             rlmmConfig.metadataTopicPartitionsCount(),
                             
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
     }
 
     /**
-     * @param topic topic to be created.
+     * @param newTopic topic to be created.
      * @return Returns true if the topic already exists, or it is created 
successfully.
      */
-    private boolean createTopic(Admin adminClient, NewTopic topic) {
-        boolean topicCreated = false;
+    private boolean createTopic(Admin adminClient, NewTopic newTopic) {
+        boolean doesTopicExist = false;
+        String topic = newTopic.name();
         try {
-            adminClient.createTopics(Collections.singleton(topic)).all().get();
-            topicCreated = true;
+            doesTopicExist = doesTopicExist(adminClient, topic);
+            if (!doesTopicExist) {
+                CreateTopicsResult result = 
adminClient.createTopics(Collections.singleton(newTopic));
+                result.all().get();
+                List<String> overriddenConfigs = result.config(topic).get()
+                        .entries()
+                        .stream()
+                        .filter(entry -> !entry.isDefault())
+                        .map(entry -> entry.name() + "=" + entry.value())
+                        .collect(Collectors.toList());
+                log.info("Topic {} created. TopicId: {}, numPartitions: {}, 
replicationFactor: {}, config: {}",
+                        topic, result.topicId(topic).get(), 
result.numPartitions(topic).get(),
+                        result.replicationFactor(topic).get(), 
overriddenConfigs);
+                doesTopicExist = true;
+            }
         } catch (Exception e) {
+            // This exception can still occur as multiple brokers may call 
create topics and one of them may become
+            // successful and other would throw TopicExistsException
             if (e.getCause() instanceof TopicExistsException) {
-                log.info("Topic [{}] already exists", topic.name());
-                topicCreated = true;
+                log.info("Topic [{}] already exists", topic);
+                doesTopicExist = true;
             } else {
-                log.error("Encountered error while creating remote log 
metadata topic.", e);
+                log.error("Encountered error while creating {} topic.", topic, 
e);
             }
         }
-
-        return topicCreated;
+        return doesTopicExist;
     }
 
     public boolean isInitialized() {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 96e48de8a73..1f2bebbd296 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
 @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for 
usages of JavaConverters
@@ -66,6 +69,28 @@ public class TopicBasedRemoteLogMetadataManagerTest {
         return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
     }
 
+    @Test
+    public void testInternalTopicExists() {
+        Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+        ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+        try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+            String topic = 
topicBasedRlmm().config().remoteLogMetadataTopicName();
+            boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
+            Assertions.assertTrue(doesTopicExist);
+        }
+    }
+
+    @Test
+    public void testTopicDoesNotExists() {
+        Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+        ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+        try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+            String topic = "dummy-test-topic";
+            boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, 
topic);
+            Assertions.assertFalse(doesTopicExist);
+        }
+    }
+
     @Test
     public void testWithNoAssignedPartitions() throws Exception {
         // This test checks simple lifecycle of 
TopicBasedRemoteLogMetadataManager with out assigning any leader/follower 
partitions.

Reply via email to