This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5c54c3b5de4 [fix][broker] Fix the broker registering might be blocked 
for long time (#23371) (#23507)
5c54c3b5de4 is described below

commit 5c54c3b5de491ee51764ce36180056cb8b203839
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Oct 23 11:06:14 2024 -0700

    [fix][broker] Fix the broker registering might be blocked for long time 
(#23371) (#23507)
---
 .../loadbalance/extensions/BrokerRegistryImpl.java | 14 ++++++------
 .../extensions/BrokerRegistryIntegrationTest.java  |  7 +++++-
 .../apache/pulsar/client/impl/TableViewTest.java   |  3 +++
 .../apache/pulsar/client/impl/TableViewImpl.java   | 25 ++++++++++++++++------
 4 files changed, 33 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 296d9a77fd7..c3cb8413b01 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -83,13 +83,6 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         this.brokerLookupDataMetadataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
         this.scheduler = pulsar.getLoadManagerExecutor();
         this.listeners = new ArrayList<>();
-        // The registered node is an ephemeral node that could be deleted when 
the metadata store client's session
-        // is expired. In this case, we should register again.
-        this.listeners.add((broker, notificationType) -> {
-            if (notificationType == NotificationType.Deleted && 
getBrokerId().equals(broker)) {
-                registerAsync();
-            }
-        });
         this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
         this.brokerLookupData = new BrokerLookupData(
                 pulsar.getWebServiceAddress(),
@@ -222,11 +215,16 @@ public class BrokerRegistryImpl implements BrokerRegistry 
{
             if (log.isDebugEnabled()) {
                 log.debug("Handle notification: [{}]", t);
             }
+            // The registered node is an ephemeral node that could be deleted 
when the metadata store client's session
+            // is expired. In this case, we should register again.
+            final var brokerId = 
t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
+            if (t.getType() == NotificationType.Deleted && 
getBrokerId().equals(brokerId)) {
+                registerAsync();
+            }
             if (listeners.isEmpty()) {
                 return;
             }
             this.scheduler.submit(() -> {
-                String brokerId = 
t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
                 for (BiConsumer<String, NotificationType> listener : 
listeners) {
                     listener.accept(brokerId, t.getType());
                 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
index 162ea50829d..232088afb94 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
@@ -63,10 +63,15 @@ public class BrokerRegistryIntegrationTest {
 
     @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
+        final var startMs = System.currentTimeMillis();
         if (pulsar != null) {
             pulsar.close();
         }
+        final var elapsedMs = System.currentTimeMillis() - startMs;
         bk.stop();
+        if (elapsedMs > 5000) {
+            throw new RuntimeException("Broker took " + elapsedMs + "ms to 
close");
+        }
     }
 
     @Test
@@ -105,7 +110,7 @@ public class BrokerRegistryIntegrationTest {
         });
     }
 
-    private ServiceConfiguration brokerConfig() {
+    protected ServiceConfiguration brokerConfig() {
         final var config = new ServiceConfiguration();
         config.setClusterName(clusterName);
         config.setAdvertisedAddress("localhost");
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 61ab4de8a32..5448751160a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -173,6 +173,9 @@ public class TableViewTest extends 
MockedPulsarServiceBaseTest {
         TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
                 .topic(topic)
                 .create();
+        // Verify refresh can handle the case when the topic is empty
+        tv.refreshAsync().get(3, TimeUnit.SECONDS);
+
         // 2. Add a listen action to provide the test environment.
         // The listen action will be triggered when there are incoming 
messages every time.
         // This is a sync operation, so sleep in the listen action can slow 
down the reading rate of messages.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 4f520604978..17b49828eec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
@@ -259,7 +260,11 @@ public class TableViewImpl<T> implements TableView<T> {
     @Override
     public CompletableFuture<Void> refreshAsync() {
         CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        reader.thenCompose(reader -> 
getLastMessageIds(reader).thenAccept(lastMessageIds -> {
+        reader.thenCompose(reader -> 
getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
+            if (lastMessageIds.isEmpty()) {
+                completableFuture.complete(null);
+                return;
+            }
             // After get the response of lastMessageIds, put the future and 
result into `refreshMap`
             // and then filter out partitions that has been read to the 
lastMessageID.
             pendingRefreshRequests.put(completableFuture, lastMessageIds);
@@ -291,8 +296,12 @@ public class TableViewImpl<T> implements TableView<T> {
         AtomicLong messagesRead = new AtomicLong();
 
         CompletableFuture<Void> future = new CompletableFuture<>();
-        getLastMessageIds(reader).thenAccept(maxMessageIds -> {
-            readAllExistingMessages(reader, future, startTime, messagesRead, 
maxMessageIds);
+        getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
+            if (lastMessageIds.isEmpty()) {
+                future.complete(null);
+                return;
+            }
+            readAllExistingMessages(reader, future, startTime, messagesRead, 
lastMessageIds);
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
             return null;
@@ -300,13 +309,15 @@ public class TableViewImpl<T> implements TableView<T> {
         return future;
     }
 
-    private CompletableFuture<Map<String, TopicMessageId>> 
getLastMessageIds(Reader<T> reader) {
+    private CompletableFuture<Map<String, TopicMessageId>> 
getLastMessageIdOfNonEmptyTopics(Reader<T> reader) {
         return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
-            Map<String, TopicMessageId> maxMessageIds = new 
ConcurrentHashMap<>();
+            Map<String, TopicMessageId> lastMessageIdMap = new 
ConcurrentHashMap<>();
             lastMessageIds.forEach(topicMessageId -> {
-                maxMessageIds.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
+                if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) {
+                    lastMessageIdMap.put(topicMessageId.getOwnerTopic(), 
topicMessageId);
+                } // else: a negative entry id represents an empty topic so 
that we don't have to read messages from it
             });
-            return maxMessageIds;
+            return lastMessageIdMap;
         });
     }
 

Reply via email to