satishd commented on code in PR #14755:
URL: https://github.com/apache/kafka/pull/14755#discussion_r1395121310


##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -477,20 +492,22 @@ private NewTopic createRemoteLogMetadataTopicRequest() {
      * @return Returns true if the topic already exists, or it is created 
successfully.
      */
     private boolean createTopic(Admin adminClient, NewTopic topic) {
-        boolean topicCreated = false;
+        boolean topicExists = false;
         try {
-            adminClient.createTopics(Collections.singleton(topic)).all().get();
-            topicCreated = true;
+            topicExists = isTopicExists(adminClient, topic.name());
+            if (!topicExists) {
+                
adminClient.createTopics(Collections.singleton(topic)).all().get();
+                topicExists = true;
+            }
         } catch (Exception e) {
             if (e.getCause() instanceof TopicExistsException) {

Review Comment:
   Note: This exception can still occur as multiple brokers may call create 
topics and one of them may become successful and other would throw 
`TopicExistsException`. Good to add a comment here for readers. 



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java:
##########
@@ -477,20 +492,22 @@ private NewTopic createRemoteLogMetadataTopicRequest() {
      * @return Returns true if the topic already exists, or it is created 
successfully.
      */
     private boolean createTopic(Admin adminClient, NewTopic topic) {
-        boolean topicCreated = false;
+        boolean topicExists = false;
         try {
-            adminClient.createTopics(Collections.singleton(topic)).all().get();
-            topicCreated = true;
+            topicExists = isTopicExists(adminClient, topic.name());
+            if (!topicExists) {
+                
adminClient.createTopics(Collections.singleton(topic)).all().get();

Review Comment:
   Good to add a log here with the result of topic creation containing the 
topic details.



##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -66,6 +70,17 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
         return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
     }
 
+    @Test
+    public void testInternalTopicExists() {
+        Properties adminConfig = 
remoteLogMetadataManagerHarness.adminClientConfig();
+        ListenerName listenerName = 
remoteLogMetadataManagerHarness.listenerName();
+        try (Admin admin = 
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+            NewTopic newTopic = 
topicBasedRlmm().createRemoteLogMetadataTopicRequest();

Review Comment:
   Can not we use `topicBasedRlmm().config().remoteLogMetadataTopicName()` 
instead of createRemoteLogMetadataTopicRequest() method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to