kamalcph commented on code in PR #20204: URL: https://github.com/apache/kafka/pull/20204#discussion_r2218645144
########## storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java: ########## @@ -392,93 +316,74 @@ public boolean isReady(TopicIdPartition topicIdPartition) { return remotePartitionMetadataStore.isInitialized(topicIdPartition); } - private void initializeResources() { + private void handleRetry(long retryIntervalMs) { + log.info("Sleep for {} ms before retrying.", retryIntervalMs); + Utils.sleep(retryIntervalMs); + } + + private void initializeResources(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig) { log.info("Initializing topic-based RLMM resources"); - final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest(); - boolean topicCreated = false; + int metadataTopicPartitionCount = rlmmConfig.metadataTopicPartitionsCount(); + long retryIntervalMs = rlmmConfig.initializationRetryIntervalMs(); + long retryMaxTimeoutMs = rlmmConfig.initializationRetryMaxTimeoutMs(); + RemoteLogMetadataTopicPartitioner partitioner = partitionerFunction.apply(metadataTopicPartitionCount); + NewTopic newTopic = newRemoteLogMetadataTopic(rlmmConfig); + boolean isTopicCreated = false; long startTimeMs = time.milliseconds(); - Admin adminClient = null; - try { - adminClient = Admin.create(rlmmConfig.commonProperties()); - // Stop if it is already initialized or closing. - while (!(initialized.get() || closing.get())) { - - // If it is timed out then raise an error to exit. - if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) { - log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.", - rlmmConfig.initializationRetryMaxTimeoutMs()); + try (Admin admin = Admin.create(rlmmConfig.commonProperties())) { + while (!(initialized.get() || closing.get() || initializationFailed)) { + if (time.milliseconds() - startTimeMs > retryMaxTimeoutMs) { + log.error("Timed out to initialize the resources within {} ms.", retryMaxTimeoutMs); initializationFailed = true; - return; - } - - if (!topicCreated) { - topicCreated = createTopic(adminClient, remoteLogMetadataTopicRequest); + break; } - - if (!topicCreated) { - // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again. - log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); - Utils.sleep(rlmmConfig.initializationRetryIntervalMs()); + isTopicCreated = isTopicCreated || createTopic(admin, newTopic); + if (!isTopicCreated) { + handleRetry(retryIntervalMs); continue; - } else { - // If topic is already created, validate the existing topic partitions. - try { - String topicName = remoteLogMetadataTopicRequest.name(); - // If the existing topic partition size is not same as configured, mark initialization as failed and exit. - if (!isPartitionsCountSameAsConfigured(adminClient, topicName)) { - initializationFailed = true; - } - } catch (Exception e) { - log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs()); - Utils.sleep(rlmmConfig.initializationRetryIntervalMs()); - continue; + } + try { + if (!isPartitionsCountSameAsConfigured(admin, newTopic.name(), metadataTopicPartitionCount)) { + initializationFailed = true; + break; } + } catch (Exception e) { + handleRetry(retryIntervalMs); + continue; } Review Comment: `createTopic` is already enclosed in try..catch block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org