This is an automated email from the ASF dual-hosted git repository. satishd pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new feee616f738 MINOR: Query before creating the internal remote log metadata topic (#14755) feee616f738 is described below commit feee616f738a059d6de3e5ec32fba6e9c2b23fb9 Author: Kamal Chandraprakash <kchandraprak...@uber.com> AuthorDate: Mon Nov 20 14:50:11 2023 +0530 MINOR: Query before creating the internal remote log metadata topic (#14755) When a node starts (or) restarts, then we send a CREATE_TOPICS request to the controller to create the internal __remote_log_metadata topic. Topic creation event is costly and handled by the controller. During re-balance, the controller can have pending requests in its queue and can lead to CREATE_TOPICS timeout. Instead of firing the CREATE_TOPICS request when a node restarts, send a METADATA request (topic describe) which is handled by the least loaded node before sending a request to create the topic. Reviewers: Satish Duggana <sati...@apache.org>, Christo Lolov <lol...@amazon.com> --- .../TopicBasedRemoteLogMetadataManager.java | 53 ++++++++++++++++++---- .../TopicBasedRemoteLogMetadataManagerTest.java | 25 ++++++++++ 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java index 9c6a2089db4..938238fae73 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java @@ -17,6 +17,7 @@ package org.apache.kafka.server.log.remote.metadata.storage; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.producer.RecordMetadata; @@ -45,6 +46,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -54,6 +56,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; /** * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. @@ -446,6 +449,20 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana } } + boolean doesTopicExist(Admin adminClient, String topic) { + try { + TopicDescription description = adminClient.describeTopics(Collections.singleton(topic)) + .topicNameValues() + .get(topic) + .get(); + log.info("Topic {} exists. Description: {}", topic, description); + return description != null; + } catch (ExecutionException | InterruptedException ex) { + log.info("Topic {} does not exist. Error: {}", topic, ex.getCause().getMessage()); + return false; + } + } + private boolean isPartitionsCountSameAsConfigured(Admin adminClient, String topicName) throws InterruptedException, ExecutionException { log.debug("Getting topic details to check for partition count and replication factor."); @@ -467,30 +484,46 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana Map<String, String> topicConfigs = new HashMap<>(); topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, Long.toString(rlmmConfig.metadataTopicRetentionMs())); topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); + topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "false"); return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(), rlmmConfig.metadataTopicPartitionsCount(), rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs); } /** - * @param topic topic to be created. + * @param newTopic topic to be created. * @return Returns true if the topic already exists, or it is created successfully. */ - private boolean createTopic(Admin adminClient, NewTopic topic) { - boolean topicCreated = false; + private boolean createTopic(Admin adminClient, NewTopic newTopic) { + boolean doesTopicExist = false; + String topic = newTopic.name(); try { - adminClient.createTopics(Collections.singleton(topic)).all().get(); - topicCreated = true; + doesTopicExist = doesTopicExist(adminClient, topic); + if (!doesTopicExist) { + CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic)); + result.all().get(); + List<String> overriddenConfigs = result.config(topic).get() + .entries() + .stream() + .filter(entry -> !entry.isDefault()) + .map(entry -> entry.name() + "=" + entry.value()) + .collect(Collectors.toList()); + log.info("Topic {} created. TopicId: {}, numPartitions: {}, replicationFactor: {}, config: {}", + topic, result.topicId(topic).get(), result.numPartitions(topic).get(), + result.replicationFactor(topic).get(), overriddenConfigs); + doesTopicExist = true; + } } catch (Exception e) { + // This exception can still occur as multiple brokers may call create topics and one of them may become + // successful and other would throw TopicExistsException if (e.getCause() instanceof TopicExistsException) { - log.info("Topic [{}] already exists", topic.name()); - topicCreated = true; + log.info("Topic [{}] already exists", topic); + doesTopicExist = true; } else { - log.error("Encountered error while creating remote log metadata topic.", e); + log.error("Encountered error while creating {} topic.", topic, e); } } - - return topicCreated; + return doesTopicExist; } public boolean isInitialized() { diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java index 96e48de8a73..1f2bebbd296 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.server.log.remote.metadata.storage; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -40,6 +42,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeoutException; @SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters @@ -66,6 +69,28 @@ public class TopicBasedRemoteLogMetadataManagerTest { return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); } + @Test + public void testInternalTopicExists() { + Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); + ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); + try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { + String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); + boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); + Assertions.assertTrue(doesTopicExist); + } + } + + @Test + public void testTopicDoesNotExists() { + Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); + ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); + try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { + String topic = "dummy-test-topic"; + boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin, topic); + Assertions.assertFalse(doesTopicExist); + } + } + @Test public void testWithNoAssignedPartitions() throws Exception { // This test checks simple lifecycle of TopicBasedRemoteLogMetadataManager with out assigning any leader/follower partitions.