This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a84c0b0a9f2d5df660f009bef00e4a7264b9e509
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Sep 30 11:02:23 2025 +0800

    [fix][broker] Fix incorrect topic loading latency metric and timeout might 
not be respected (#24785)
    
    (cherry picked from commit 5b2562dc86df5e46410d9ebfce7b30a03a60baeb)
---
 .../pulsar/broker/service/BrokerService.java       | 141 ++++++++++++---------
 .../pulsar/broker/service/TopicLoadingContext.java |  65 ++++++++++
 .../pulsar/broker/service/BrokerServiceTest.java   |   7 +-
 .../buffer/TopicTransactionBufferTest.java         |   5 +-
 4 files changed, 150 insertions(+), 68 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 044f44a644a..5d9cad30ae9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -80,7 +80,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
@@ -96,6 +95,7 @@ import 
org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import 
org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
 import org.apache.pulsar.broker.PulsarServerException;
@@ -195,6 +195,7 @@ import 
org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.jspecify.annotations.NonNull;
+import org.jspecify.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1042,7 +1043,7 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Optional<Topic>> getTopic(final String topic, 
boolean createIfMissing,
-                                                       Map<String, String> 
properties) {
+                                                       @Nullable Map<String, 
String> properties) {
         return getTopic(TopicName.get(topic), createIfMissing, properties);
     }
 
@@ -1126,7 +1127,7 @@ public class BrokerService implements Closeable {
      * @return CompletableFuture with an Optional of the topic if found or 
created, otherwise empty.
      */
     public CompletableFuture<Optional<Topic>> getTopic(final TopicName 
topicName, boolean createIfMissing,
-                                                       Map<String, String> 
properties) {
+                                                       @Nullable Map<String, 
String> properties) {
         try {
             // If topic future exists in the cache returned directly 
regardless of whether it fails or timeout.
             CompletableFuture<Optional<Topic>> tp = 
topics.get(topicName.toString());
@@ -1142,13 +1143,31 @@ public class BrokerService implements Closeable {
                     return FutureUtil.failedFuture(new NotAllowedException(
                             "Broker is unable to load persistent topic"));
                 }
-                return 
checkNonPartitionedTopicExists(topicName).thenCompose(exists -> {
+                final CompletableFuture<Optional<Topic>> topicFuture = 
FutureUtil.createFutureWithTimeout(
+                        
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), 
executor(),
+                        () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
+                final var context = new TopicLoadingContext(topicName, 
createIfMissing, topicFuture);
+                if (properties != null) {
+                    context.setProperties(properties);
+                }
+                topicFuture.exceptionally(t -> {
+                    final var now = System.nanoTime();
+                    if (FutureUtil.unwrapCompletionException(t) instanceof 
TimeoutException) {
+                        log.warn("Failed to load {} after {} ms", topicName, 
context.latencyMs(now));
+                    } else {
+                        log.warn("Failed to load {} after {} ms", topicName, 
context.latencyString(now), t);
+                    }
+                    pulsarStats.recordTopicLoadFailed();
+                    return Optional.empty();
+                });
+                checkNonPartitionedTopicExists(topicName).thenAccept(exists -> 
{
                     if (!exists && !createIfMissing) {
-                        return 
CompletableFuture.completedFuture(Optional.empty());
+                        topicFuture.complete(Optional.empty());
+                        return;
                     }
                     // The topic level policies are not needed now, but the 
meaning of calling
                     // "getTopicPoliciesBypassSystemTopic" will wait for 
system topic policies initialization.
-                    return getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY)
+                    getTopicPoliciesBypassSystemTopic(topicName, 
TopicPoliciesService.GetType.LOCAL_ONLY)
                             .exceptionally(ex -> {
                         final Throwable rc = 
FutureUtil.unwrapCompletionException(ex);
                         final String errorInfo = String.format("Topic creation 
encountered an exception by initialize"
@@ -1156,11 +1175,29 @@ public class BrokerService implements Closeable {
                                 rc.getMessage());
                         log.error(errorInfo, rc);
                         throw FutureUtil.wrapToCompletionException(new 
ServiceUnitNotReadyException(errorInfo));
-                    }).thenCompose(optionalTopicPolicies -> {
-                        return topics.computeIfAbsent(topicName.toString(),
-                                (tpName) -> 
loadOrCreatePersistentTopic(tpName, createIfMissing, properties));
+                    }).thenRun(() -> {
+                        final var inserted = new MutableBoolean(false);
+                        final var cachedFuture = 
topics.computeIfAbsent(topicName.toString(), ___ -> {
+                            inserted.setTrue();
+                            return loadOrCreatePersistentTopic(context);
+                        });
+                        if (inserted.isFalse()) {
+                            // This case should happen rarely when the same 
topic is loaded concurrently because we
+                            // checked if the `topics` cache includes this 
topic before, so the latency is not the
+                            // actual loading latency that should not be 
recorded in metrics.
+                            log.info("[{}] Finished loading from other 
concurrent loading task (latency: {})",
+                                    topicName, 
context.latencyString(System.nanoTime()));
+                            cachedFuture.whenComplete((optTopic, e) -> {
+                                if (e == null) {
+                                    topicFuture.complete(optTopic);
+                                } else {
+                                    topicFuture.completeExceptionally(e);
+                                }
+                            });
+                        }
                     });
                 });
+                return topicFuture;
             } else {
                 if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
                     if (log.isDebugEnabled()) {
@@ -1631,29 +1668,16 @@ public class BrokerService implements Closeable {
     /**
      * It creates a topic async and returns CompletableFuture. It also 
throttles down configured max-concurrent topic
      * loading and puts them into queue once in-process topics are created.
-     *
-     * @param topic persistent-topic name
-     * @return CompletableFuture<Topic>
-     * @throws RuntimeException
      */
-    protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(final String topic,
-            boolean createIfMissing, Map<String, String> properties) {
-        final CompletableFuture<Optional<Topic>> topicFuture = 
FutureUtil.createFutureWithTimeout(
-                
Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), 
executor(),
-                () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
-
-        topicFuture.exceptionally(t -> {
-            pulsarStats.recordTopicLoadFailed();
-            return null;
-        });
-
+    protected CompletableFuture<Optional<Topic>> 
loadOrCreatePersistentTopic(TopicLoadingContext context) {
+        final var topic = context.getTopicName().toString();
+        final var topicFuture = context.getTopicFuture();
         checkTopicNsOwnership(topic)
                 .thenRun(() -> {
                     final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
 
                     if (topicLoadSemaphore.tryAcquire()) {
-                        checkOwnershipAndCreatePersistentTopic(topic, 
createIfMissing, topicFuture,
-                                properties);
+                        checkOwnershipAndCreatePersistentTopic(context);
                         topicFuture.handle((persistentTopic, ex) -> {
                             // release permit and process pending topic
                             topicLoadSemaphore.release();
@@ -1668,8 +1692,7 @@ public class BrokerService implements Closeable {
                             return null;
                         });
                     } else {
-                        pendingTopicLoadingQueue.add(new 
TopicLoadingContext(topic,
-                                createIfMissing, topicFuture, properties));
+                        pendingTopicLoadingQueue.add(context);
                         if (log.isDebugEnabled()) {
                             log.debug("topic-loading for {} added into pending 
queue", topic);
                         }
@@ -1712,23 +1735,23 @@ public class BrokerService implements Closeable {
         }
     }
 
-    private void checkOwnershipAndCreatePersistentTopic(final String topic, 
boolean createIfMissing,
-                                       CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties) {
-        TopicName topicName = TopicName.get(topic);
+    private void checkOwnershipAndCreatePersistentTopic(TopicLoadingContext 
context) {
+        TopicName topicName = context.getTopicName();
+        final var topic = topicName.toString();
+        final var topicFuture = context.getTopicFuture();
         checkTopicNsOwnership(topic).thenRun(() -> {
             CompletableFuture<Map<String, String>> propertiesFuture;
-            if (properties == null) {
+            if (context.getProperties() == null) {
                 //Read properties from storage when loading topic.
                 propertiesFuture = fetchTopicPropertiesAsync(topicName);
             } else {
-                propertiesFuture = 
CompletableFuture.completedFuture(properties);
+                propertiesFuture = 
CompletableFuture.completedFuture(context.getProperties());
             }
-            propertiesFuture.thenAccept(finalProperties ->
-                    //TODO add topicName in properties?
-                    createPersistentTopic0(topic, createIfMissing, topicFuture,
-                            finalProperties)
-            ).exceptionally(throwable -> {
+            propertiesFuture.thenAccept(finalProperties -> {
+                context.setProperties(finalProperties);
+                //TODO add topicName in properties?
+                createPersistentTopic0(context);
+            }).exceptionally(throwable -> {
                 log.warn("[{}] Read topic property failed", topic, throwable);
                 pulsar.getExecutor().execute(() -> topics.remove(topic, 
topicFuture));
                 topicFuture.completeExceptionally(throwable);
@@ -1742,11 +1765,11 @@ public class BrokerService implements Closeable {
     }
 
     @VisibleForTesting
-    public void createPersistentTopic0(final String topic, boolean 
createIfMissing,
-                                       CompletableFuture<Optional<Topic>> 
topicFuture,
-                                       Map<String, String> properties) {
-        TopicName topicName = TopicName.get(topic);
-        final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+    public void createPersistentTopic0(TopicLoadingContext context) {
+        TopicName topicName = context.getTopicName();
+        final var topic = topicName.toString();
+        final var topicFuture = context.getTopicFuture();
+        final var createIfMissing = context.isCreateIfMissing();
 
         if (isTransactionInternalName(topicName)) {
             String msg = String.format("Can not create transaction system 
topic %s", topic);
@@ -1780,7 +1803,9 @@ public class BrokerService implements Closeable {
                         new ManagedLedgerInterceptorImpl(interceptors, 
brokerEntryPayloadProcessors));
             }
             managedLedgerConfig.setCreateIfMissing(createIfMissing);
-            managedLedgerConfig.setProperties(properties);
+            if (context.getProperties() != null) {
+                managedLedgerConfig.setProperties(context.getProperties());
+            }
             String shadowSource = managedLedgerConfig.getShadowSource();
             if (shadowSource != null) {
                 
managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding());
@@ -1825,10 +1850,11 @@ public class BrokerService implements Closeable {
                                             return 
persistentTopic.checkDeduplicationStatus();
                                         })
                                         .thenRun(() -> {
-                                            log.info("Created topic {} - dedup 
is {}", topic,
-                                            
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled");
-                                            long topicLoadLatencyMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
-                                                                        - 
topicCreateTimeMs;
+                                            long nowInNanos = 
System.nanoTime();
+                                            long topicLoadLatencyMs = 
context.latencyMs(nowInNanos);
+                                            log.info("Created topic {} - dedup 
is {} (latency: {})", topic,
+                                                    
persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled",
+                                                    
context.latencyString(nowInNanos));
                                             
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                             if 
(!topicFuture.complete(Optional.of(persistentTopic))) {
                                                 // Check create persistent 
topic timeout.
@@ -3244,15 +3270,13 @@ public class BrokerService implements Closeable {
             return;
         }
 
-        final String topic = pendingTopic.getTopic();
+        pendingTopic.polledFromQueue();
+        final String topic = pendingTopic.getTopicName().toString();
         checkTopicNsOwnership(topic).thenRun(() -> {
             CompletableFuture<Optional<Topic>> pendingFuture = 
pendingTopic.getTopicFuture();
             final Semaphore topicLoadSemaphore = 
topicLoadRequestSemaphore.get();
             final boolean acquiredPermit = topicLoadSemaphore.tryAcquire();
-            checkOwnershipAndCreatePersistentTopic(topic,
-                    pendingTopic.isCreateIfMissing(),
-                    pendingFuture,
-                    pendingTopic.getProperties());
+            checkOwnershipAndCreatePersistentTopic(pendingTopic);
             pendingFuture.handle((persistentTopic, ex) -> {
                 // release permit and process next pending topic
                 if (acquiredPermit) {
@@ -3827,13 +3851,4 @@ public class BrokerService implements Closeable {
     public void 
setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
         this.pulsarChannelInitFactory = factory;
     }
-
-    @AllArgsConstructor
-    @Getter
-    private static class TopicLoadingContext {
-        private final String topic;
-        private final boolean createIfMissing;
-        private final CompletableFuture<Optional<Topic>> topicFuture;
-        private final Map<String, String> properties;
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java
new file mode 100644
index 00000000000..9e3ed230cd2
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicLoadingContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import org.apache.pulsar.common.naming.TopicName;
+import org.jspecify.annotations.Nullable;
+
+@RequiredArgsConstructor
+public class TopicLoadingContext {
+
+    private static final String EXAMPLE_LATENCY_OUTPUTS = "1234 ms (queued: 
567)";
+
+    private final long startNs = System.nanoTime();
+    @Getter
+    private final TopicName topicName;
+    @Getter
+    private final boolean createIfMissing;
+    @Getter
+    private final CompletableFuture<Optional<Topic>> topicFuture;
+    @Getter
+    @Setter
+    @Nullable private Map<String, String> properties;
+    private long polledFromQueueNs = -1L;
+
+    public void polledFromQueue() {
+        polledFromQueueNs = System.nanoTime();
+    }
+
+    public long latencyMs(long nowInNanos) {
+        return TimeUnit.NANOSECONDS.toMillis(nowInNanos - startNs);
+    }
+
+    public String latencyString(long nowInNanos) {
+        final var builder = new 
StringBuilder(EXAMPLE_LATENCY_OUTPUTS.length());
+        builder.append(latencyMs(nowInNanos));
+        builder.append(" ms");
+        if (polledFromQueueNs >= 0) {
+            builder.append(" (queued: 
").append(latencyMs(polledFromQueueNs)).append(")");
+        }
+        return builder.toString();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
index 49f5d7c5c36..62dcc37f38e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java
@@ -1184,7 +1184,8 @@ public class BrokerServiceTest extends BrokerTestBase {
 
         // try to create topic which should fail as bundle is disable
         CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
-                .loadOrCreatePersistentTopic(topicName, true, null);
+                .loadOrCreatePersistentTopic(new TopicLoadingContext(topic, 
true,
+                        new CompletableFuture<>()));
 
         try {
             futureResult.get();
@@ -1227,8 +1228,8 @@ public class BrokerServiceTest extends BrokerTestBase {
             ArrayList<CompletableFuture<Optional<Topic>>> loadFutures = new 
ArrayList<>();
             for (int i = 0; i < 10; i++) {
                 // try to create topic which should fail as bundle is disable
-                CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService()
-                        .loadOrCreatePersistentTopic(topicName + "_" + i, 
false, null);
+                CompletableFuture<Optional<Topic>> futureResult = 
pulsar.getBrokerService().loadOrCreatePersistentTopic(
+                        new TopicLoadingContext(TopicName.get(topicName + "_" 
+ i), false, new CompletableFuture<>()));
                 loadFutures.add(futureResult);
             }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 93654db2c99..5a54b37a637 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.opentelemetry.api.common.Attributes;
 import java.time.Duration;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -47,6 +46,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.TopicLoadingContext;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
@@ -178,7 +178,8 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                 .newTopic(Mockito.eq(topic), Mockito.any(), 
Mockito.eq(brokerService),
                         Mockito.eq(PersistentTopic.class));
 
-        brokerService.createPersistentTopic0(topic, true, new 
CompletableFuture<>(), Collections.emptyMap());
+        brokerService.createPersistentTopic0(new 
TopicLoadingContext(TopicName.get(topic), true,
+                new CompletableFuture<>()));
 
         Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() 
!= null);
         PersistentTopic persistentTopic = reference.get();

Reply via email to