This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28ff2a47e6e KAFKA-19426 TopicBasedRemoteLogMetadataManager's initial
should happen after the broker ready (KIP-1197) (#20203)
28ff2a47e6e is described below
commit 28ff2a47e6ed17e3569831a2d6d2e1c644cb2875
Author: Jian <[email protected]>
AuthorDate: Sun Nov 2 22:21:40 2025 +0800
KAFKA-19426 TopicBasedRemoteLogMetadataManager's initial should happen
after the broker ready (KIP-1197) (#20203)
refer to https://github.com/apache/kafka/pull/20008. I submit another
proposal to solve the issue which can also help to solve the following
issue: the
DEFAULT_REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS is hard
to be defined due to that it relay on the startup process.
If it the value is not well defined. the initial will fail and cause
local disk never get deleted. (propose
https://github.com/apache/kafka/pull/20007 to prevent it.)
Different env's kafka process need different value. It is caused by
followed reason: When restarting broker. The connection to query/create
topic in "TopicBasedRemoteLogMetadataManager#initializeResources"will
fail until the broker's self get ready. The detail reason can refer to
:https://github.com/apache/kafka/pull/20007#issuecomment-3027094794
So propose this change to improve it. After the improve it and default
value is easy to be defined. I don't need to set a very big value to all
our kafkas.
Reviewers: Yung <[email protected]>, Kamal Chandraprakash
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 14 +++++++++++
.../metadata/storage/BrokerReadyCallback.java | 29 ++++++++++++++++++++++
.../TopicBasedRemoteLogMetadataManager.java | 27 ++++++++++++++++----
.../TopicBasedRemoteLogMetadataManagerConfig.java | 6 +++--
.../log/remote/storage/RemoteLogManager.java | 2 +-
.../storage/RemoteLogMetadataManagerTestUtils.java | 1 +
.../TopicBasedRemoteLogMetadataManagerTest.java | 1 +
7 files changed, 72 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8165ddde8d1..4060c9c8d40 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -47,6 +47,7 @@ import org.apache.kafka.security.{CredentialProvider,
DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
+import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager,
RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin,
KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
@@ -596,6 +597,19 @@ class BrokerServer(
"all of the SocketServer Acceptors to be started",
enableRequestProcessingFuture, startupDeadline, time)
+ remoteLogManagerOpt.foreach(rlm =>
+ rlm.remoteLogMetadataManager() match {
+ case callback: BrokerReadyCallback =>
+ try {
+ callback.onBrokerReady()
+ } catch {
+ case e: Exception =>
+ error(s"Error executing broker ready callback:
${callback.getClass.getSimpleName}", e)
+ }
+ case _ => // Skip
+ }
+ )
+
maybeChangeStatus(STARTING, STARTED)
} catch {
case e: Throwable =>
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/BrokerReadyCallback.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/BrokerReadyCallback.java
new file mode 100644
index 00000000000..43a55348bd7
--- /dev/null
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/BrokerReadyCallback.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+/**
+ * Callback interface for broker ready notification.
+ */
+public interface BrokerReadyCallback {
+ /**
+ * This method will be called during broker startup for the implementation,
+ * which needs delayed initialization until the broker can process
requests.
+ */
+ void onBrokerReady();
+
+}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 7d43db1d8e7..1b2b459cb8d 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -68,7 +68,7 @@ import java.util.function.Supplier;
* {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an
instance of this class, and it subscribes
* to metadata updates for the registered user topic partitions.
*/
-public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataManager {
+public class TopicBasedRemoteLogMetadataManager implements
BrokerReadyCallback, RemoteLogMetadataManager {
private static final Logger log =
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
private final Time time = Time.SYSTEM;
@@ -290,12 +290,10 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
try {
if (configured.compareAndSet(false, true)) {
TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(configs);
- // Scheduling the initialization producer/consumer managers in
a separate thread. Required resources may
- // not yet be available now. This thread makes sure that it is
retried at regular intervals until it is
- // successful.
+ // Creates initialization thread for producer/consumer
managers. It will be started when
+ // the broker is ready via onBrokerReady(). The thread retries
until resources are available.
initializationThread = KafkaThread.nonDaemon(
"RLMMInitializationThread", () ->
initializeResources(rlmmConfig));
- initializationThread.start();
log.info("Successfully configured topic-based RLMM with
config: {}", rlmmConfig);
} else {
log.info("Skipping configure as it is already configured.");
@@ -376,6 +374,25 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
}
+ /**
+ * Invoked when the broker is ready to handle requests. This triggers the
initialization of
+ * resources including Kafka clients that operate on the remote log
metadata topic.
+ * <p>
+ * The target cluster for the topic is determined by configuration and can
be either:
+ * <ol>
+ * <li>The local cluster (most common) - the initialization is deferred
until the broker is ready
+ * to handle requests. Early initialization would lead to connection
failures.</li>
+ * <li>A remote cluster - the delay is not necessary but causes no
harm.</li>
+ * </ol>
+ * <p>
+ * By using the broker ready state as the initialization trigger, the
implementation optimally handles
+ * the typical case while remaining correct for alternative configurations.
+ */
+ @Override
+ public void onBrokerReady() {
+ initializationThread.start();
+ }
+
boolean doesTopicExist(Admin admin, String topic) throws
ExecutionException, InterruptedException {
try {
TopicDescription description = admin.describeTopics(Set.of(topic))
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index e3e75b9f473..721d58e34d8 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -69,8 +69,10 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
"retrying RemoteLogMetadataManager resources initialization
again.";
public static final String
REMOTE_LOG_METADATA_INITIALIZATION_RETRY_MAX_TIMEOUT_MS_DOC = "The maximum
amount of time in milliseconds " +
- "for retrying RemoteLogMetadataManager resources initialization.
When total retry intervals reach this timeout, initialization " +
- "is considered as failed and broker starts shutting down.";
+ "for retrying RemoteLogMetadataManager resources initialization. "
+
+ "For TopicBasedRemoteLogMetadataManager's initialization, the
timer starts after this local broker is ready to process requests " +
+ "(primarily for ensuring the local cluster is ready when metadata
is stored locally as an internal topic). " +
+ "If initialization fails within this timeout, this broker process
will terminate.";
public static final String REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX =
"remote.log.metadata.common.client.";
public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX =
"remote.log.metadata.producer.";
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index f6480b8668c..a9848633eff 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -426,7 +426,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
return Plugin.wrapInstance(rlmm, metrics,
RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP);
}
- RemoteLogMetadataManager remoteLogMetadataManager() {
+ public RemoteLogMetadataManager remoteLogMetadataManager() {
return remoteLogMetadataManagerPlugin.get();
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
index 149ebb177d9..8a1f44fcea2 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataManagerTestUtils.java
@@ -89,6 +89,7 @@ public class RemoteLogMetadataManagerTestUtils {
configs.putAll(overrideRemoteLogMetadataManagerProps);
topicBasedRemoteLogMetadataManager.configure(configs);
+ topicBasedRemoteLogMetadataManager.onBrokerReady();
assertDoesNotThrow(() ->
TestUtils.waitForCondition(topicBasedRemoteLogMetadataManager::isInitialized,
60_000L,
"Time out reached before it is initialized successfully"));
return topicBasedRemoteLogMetadataManager;
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 723e756036c..f564028ccaf 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -357,6 +357,7 @@ public class TopicBasedRemoteLogMetadataManagerTest {
TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID, 0
);
rlmm.configure(configs);
+ rlmm.onBrokerReady();
// Wait for initialization failure and exit procedure to be called
TestUtils.waitForCondition(() -> exitCalled.get(),