This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 28ff2a47e6e KAFKA-19426 TopicBasedRemoteLogMetadataManager's initial 
should happen after the broker ready (KIP-1197) (#20203)
28ff2a47e6e is described below

commit 28ff2a47e6ed17e3569831a2d6d2e1c644cb2875
Author: Jian <[email protected]>
AuthorDate: Sun Nov 2 22:21:40 2025 +0800

    KAFKA-19426 TopicBasedRemoteLogMetadataManager's initial should happen 
after the broker ready (KIP-1197) (#20203)
    
    refer to https://github.com/apache/kafka/pull/20008. I submit another
    proposal to solve the issue which can also help to solve the following
    issue:   the
    DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS is hard
    to be defined due to that it relay on the startup process.
    
    If it the value is not well defined. the initial will fail and cause
    local disk never get deleted.  (propose
    https://github.com/apache/kafka/pull/20007 to prevent it.)
    
    Different env's kafka process need different value. It is caused by
    followed reason:  When restarting broker. The connection to query/create
    topic in "TopicBasedRemoteLogMetadataManager#initializeResources"will
    fail until the broker's self get ready. The detail reason can refer to
    :https://github.com/apache/kafka/pull/20007#issuecomment-3027094794
    
    So propose this change to improve it. After the improve it and default
    value is easy to be defined. I don't need to set a very big value to all
    our kafkas.
    
    Reviewers: Yung <[email protected]>, Kamal Chandraprakash
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala | 14 +++++++++++
 .../metadata/storage/BrokerReadyCallback.java      | 29 ++++++++++++++++++++++
 .../TopicBasedRemoteLogMetadataManager.java        | 27 ++++++++++++++++----
 .../TopicBasedRemoteLogMetadataManagerConfig.java  |  6 +++--
 .../log/remote/storage/RemoteLogManager.java       |  2 +-
 .../storage/RemoteLogMetadataManagerTestUtils.java |  1 +
 .../TopicBasedRemoteLogMetadataManagerTest.java    |  1 +
 7 files changed, 72 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8165ddde8d1..4060c9c8d40 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -47,6 +47,7 @@ import org.apache.kafka.security.{CredentialProvider, 
DelegationTokenManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
 import org.apache.kafka.server.config.{ConfigType, 
DelegationTokenManagerConfigs}
+import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
 import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, 
RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
KafkaYammerMetrics}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
@@ -596,6 +597,19 @@ class BrokerServer(
         "all of the SocketServer Acceptors to be started",
         enableRequestProcessingFuture, startupDeadline, time)
 
+      remoteLogManagerOpt.foreach(rlm =>
+        rlm.remoteLogMetadataManager() match {
+          case callback: BrokerReadyCallback =>
+            try {
+              callback.onBrokerReady()
+            } catch {
+              case e: Exception =>
+                error(s"Error executing broker ready callback: 
${callback.getClass.getSimpleName}", e)
+            }
+          case _ => // Skip
+        }
+      )
+
       maybeChangeStatus(STARTING, STARTED)
     } catch {
       case e: Throwable =>
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/BrokerReadyCallback.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/BrokerReadyCallback.java
new file mode 100644
index 00000000000..43a55348bd7
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/BrokerReadyCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+/**
+ * Callback interface for broker ready notification.
+ */
+public interface BrokerReadyCallback {
+    /**
+     * This method will be called during broker startup for the implementation,
+     * which needs delayed initialization until the broker can process 
requests.
+     */
+    void onBrokerReady();
+
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 7d43db1d8e7..1b2b459cb8d 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -68,7 +68,7 @@ import java.util.function.Supplier;
  * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class, and it subscribes
  * to metadata updates for the registered user topic partitions.
  */
-public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+public class TopicBasedRemoteLogMetadataManager implements 
BrokerReadyCallback, RemoteLogMetadataManager {
     private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
     private final Time time = Time.SYSTEM;
 
@@ -290,12 +290,10 @@ public class TopicBasedRemoteLogMetadataManager 
implements RemoteLogMetadataMana
         try {
             if (configured.compareAndSet(false, true)) {
                 TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new 
TopicBasedRemoteLogMetadataManagerConfig(configs);
-                // Scheduling the initialization producer/consumer managers in 
a separate thread. Required resources may
-                // not yet be available now. This thread makes sure that it is 
retried at regular intervals until it is
-                // successful.
+                // Creates initialization thread for producer/consumer 
managers. It will be started when
+                // the broker is ready via onBrokerReady(). The thread retries 
until resources are available.
                 initializationThread = KafkaThread.nonDaemon(
                         "RLMMInitializationThread", () -> 
initializeResources(rlmmConfig));
-                initializationThread.start();
                 log.info("Successfully configured topic-based RLMM with 
config: {}", rlmmConfig);
             } else {
                 log.info("Skipping configure as it is already configured.");
@@ -376,6 +374,25 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         }
     }
 
+    /**
+     * Invoked when the broker is ready to handle requests. This triggers the 
initialization of
+     * resources including Kafka clients that operate on the remote log 
metadata topic.
+     * <p>
+     * The target cluster for the topic is determined by configuration and can 
be either:
+     * <ol>
+     *   <li>The local cluster (most common) - the initialization is deferred 
until the broker is ready
+     *       to handle requests. Early initialization would lead to connection 
failures.</li>
+     *   <li>A remote cluster - the delay is not necessary but causes no 
harm.</li>
+     * </ol>
+     * <p>
+     * By using the broker ready state as the initialization trigger, the 
implementation optimally handles
+     * the typical case while remaining correct for alternative configurations.
+     */
+    @Override
+    public void onBrokerReady() {
+        initializationThread.start();
+    }
+
     boolean doesTopicExist(Admin admin, String topic) throws 
ExecutionException, InterruptedException {
         try {
             TopicDescription description = admin.describeTopics(Set.of(topic))
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index e3e75b9f473..721d58e34d8 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -69,8 +69,10 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
             "retrying RemoteLogMetadataManager resources initialization 
again.";
 
     public static final String 
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum 
amount of time in milliseconds " +
-            "for retrying RemoteLogMetadataManager resources initialization. 
When total retry intervals reach this timeout, initialization " +
-            "is considered as failed and broker starts shutting down.";
+            "for retrying RemoteLogMetadataManager resources initialization. " 
+
+            "For TopicBasedRemoteLogMetadataManager's initialization, the 
timer starts after this local broker is ready to process requests " +
+            "(primarily for ensuring the local cluster is ready when metadata 
is stored locally as an internal topic). " +
+            "If initialization fails within this timeout, this broker process 
will terminate.";
 
     public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX = 
"remote.log.metadata.common.client.";
     public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = 
"remote.log.metadata.producer.";
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index f6480b8668c..a9848633eff 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -426,7 +426,7 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
         return Plugin.wrapInstance(rlmm, metrics, 
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
     }
 
-    RemoteLogMetadataManager remoteLogMetadataManager() {
+    public RemoteLogMetadataManager remoteLogMetadataManager() {
         return remoteLogMetadataManagerPlugin.get();
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index 149ebb177d9..8a1f44fcea2 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -89,6 +89,7 @@ public class RemoteLogMetadataManagerTestUtils {
             configs.putAll(overrideRemoteLogMetadataManagerProps);
 
             topicBasedRemoteLogMetadataManager.configure(configs);
+            topicBasedRemoteLogMetadataManager.onBrokerReady();
             assertDoesNotThrow(() -> 
TestUtils.waitForCondition(topicBasedRemoteLogMetadataManager::isInitialized, 
60_000L,
                     "Time out reached before it is initialized successfully"));
             return topicBasedRemoteLogMetadataManager;
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 723e756036c..f564028ccaf 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -357,6 +357,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
                 TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID, 0
             );
             rlmm.configure(configs);
+            rlmm.onBrokerReady();
             
             // Wait for initialization failure and exit procedure to be called
             TestUtils.waitForCondition(() -> exitCalled.get(), 

Reply via email to