This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 5c54c3b5de4 [fix][broker] Fix the broker registering might be blocked
for long time (#23371) (#23507)
5c54c3b5de4 is described below
commit 5c54c3b5de491ee51764ce36180056cb8b203839
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Oct 23 11:06:14 2024 -0700
[fix][broker] Fix the broker registering might be blocked for long time
(#23371) (#23507)
---
.../loadbalance/extensions/BrokerRegistryImpl.java | 14 ++++++------
.../extensions/BrokerRegistryIntegrationTest.java | 7 +++++-
.../apache/pulsar/client/impl/TableViewTest.java | 3 +++
.../apache/pulsar/client/impl/TableViewImpl.java | 25 ++++++++++++++++------
4 files changed, 33 insertions(+), 16 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 296d9a77fd7..c3cb8413b01 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -83,13 +83,6 @@ public class BrokerRegistryImpl implements BrokerRegistry {
this.brokerLookupDataMetadataCache =
pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
- // The registered node is an ephemeral node that could be deleted when
the metadata store client's session
- // is expired. In this case, we should register again.
- this.listeners.add((broker, notificationType) -> {
- if (notificationType == NotificationType.Deleted &&
getBrokerId().equals(broker)) {
- registerAsync();
- }
- });
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
this.brokerLookupData = new BrokerLookupData(
pulsar.getWebServiceAddress(),
@@ -222,11 +215,16 @@ public class BrokerRegistryImpl implements BrokerRegistry
{
if (log.isDebugEnabled()) {
log.debug("Handle notification: [{}]", t);
}
+ // The registered node is an ephemeral node that could be deleted
when the metadata store client's session
+ // is expired. In this case, we should register again.
+ final var brokerId =
t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
+ if (t.getType() == NotificationType.Deleted &&
getBrokerId().equals(brokerId)) {
+ registerAsync();
+ }
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
- String brokerId =
t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);
for (BiConsumer<String, NotificationType> listener :
listeners) {
listener.accept(brokerId, t.getType());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
index 162ea50829d..232088afb94 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryIntegrationTest.java
@@ -63,10 +63,15 @@ public class BrokerRegistryIntegrationTest {
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
+ final var startMs = System.currentTimeMillis();
if (pulsar != null) {
pulsar.close();
}
+ final var elapsedMs = System.currentTimeMillis() - startMs;
bk.stop();
+ if (elapsedMs > 5000) {
+ throw new RuntimeException("Broker took " + elapsedMs + "ms to
close");
+ }
}
@Test
@@ -105,7 +110,7 @@ public class BrokerRegistryIntegrationTest {
});
}
- private ServiceConfiguration brokerConfig() {
+ protected ServiceConfiguration brokerConfig() {
final var config = new ServiceConfiguration();
config.setClusterName(clusterName);
config.setAdvertisedAddress("localhost");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 61ab4de8a32..5448751160a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -173,6 +173,9 @@ public class TableViewTest extends
MockedPulsarServiceBaseTest {
TableView<byte[]> tv = pulsarClient.newTableView(Schema.BYTES)
.topic(topic)
.create();
+ // Verify refresh can handle the case when the topic is empty
+ tv.refreshAsync().get(3, TimeUnit.SECONDS);
+
// 2. Add a listen action to provide the test environment.
// The listen action will be triggered when there are incoming
messages every time.
// This is a sync operation, so sleep in the listen action can slow
down the reading rate of messages.
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 4f520604978..17b49828eec 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
@@ -259,7 +260,11 @@ public class TableViewImpl<T> implements TableView<T> {
@Override
public CompletableFuture<Void> refreshAsync() {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- reader.thenCompose(reader ->
getLastMessageIds(reader).thenAccept(lastMessageIds -> {
+ reader.thenCompose(reader ->
getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
+ if (lastMessageIds.isEmpty()) {
+ completableFuture.complete(null);
+ return;
+ }
// After get the response of lastMessageIds, put the future and
result into `refreshMap`
// and then filter out partitions that has been read to the
lastMessageID.
pendingRefreshRequests.put(completableFuture, lastMessageIds);
@@ -291,8 +296,12 @@ public class TableViewImpl<T> implements TableView<T> {
AtomicLong messagesRead = new AtomicLong();
CompletableFuture<Void> future = new CompletableFuture<>();
- getLastMessageIds(reader).thenAccept(maxMessageIds -> {
- readAllExistingMessages(reader, future, startTime, messagesRead,
maxMessageIds);
+ getLastMessageIdOfNonEmptyTopics(reader).thenAccept(lastMessageIds -> {
+ if (lastMessageIds.isEmpty()) {
+ future.complete(null);
+ return;
+ }
+ readAllExistingMessages(reader, future, startTime, messagesRead,
lastMessageIds);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
@@ -300,13 +309,15 @@ public class TableViewImpl<T> implements TableView<T> {
return future;
}
- private CompletableFuture<Map<String, TopicMessageId>>
getLastMessageIds(Reader<T> reader) {
+ private CompletableFuture<Map<String, TopicMessageId>>
getLastMessageIdOfNonEmptyTopics(Reader<T> reader) {
return reader.getLastMessageIdsAsync().thenApply(lastMessageIds -> {
- Map<String, TopicMessageId> maxMessageIds = new
ConcurrentHashMap<>();
+ Map<String, TopicMessageId> lastMessageIdMap = new
ConcurrentHashMap<>();
lastMessageIds.forEach(topicMessageId -> {
- maxMessageIds.put(topicMessageId.getOwnerTopic(),
topicMessageId);
+ if (((MessageIdAdv) topicMessageId).getEntryId() >= 0) {
+ lastMessageIdMap.put(topicMessageId.getOwnerTopic(),
topicMessageId);
+ } // else: a negative entry id represents an empty topic so
that we don't have to read messages from it
});
- return maxMessageIds;
+ return lastMessageIdMap;
});
}