This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d9f564076a8 [feat][broker] PIP-145: Notifications for faster topic discovery (#16062) d9f564076a8 is described below commit d9f564076a85ce108de16b1d350765605509bd1e Author: Andras Beni <andras.b...@streamnative.io> AuthorDate: Fri Jul 1 23:07:26 2022 +0200 [feat][broker] PIP-145: Notifications for faster topic discovery (#16062) * PIP-145: Notifications for faster topic discovery This commit introduces topic list watchers. By using these objects clients can observe the creation or deletion of topics closer to real-time. This reduces latency in consuming the first messages published to a topic when using a pattern-based subscription. Modifications: - New commands were added to the binary protocol to enable registering and deregistering watchers. - Pattern-based consumers create TopicListWatcher objects if the broker supports this feature. Otherwise, they fall back to polling only. - The watchers use ConnectionHandler to obtain a connection to a broker. - Once connected, watchers register and wait for updates. - ServerCnx uses the newly created TopicListService to manage watchers. - TopicListService listens to metadata notifications and sends updates. * Fix checkstyle violation * Fix cpp client compile error * Remove unused code, remove failed watchers * Rename command Unwatch, extract fields from command to avoid concurrent modification * Fix cpp client compile error * Fix cpp client compile error Co-authored-by: Matteo Merli <mme...@apache.org> --- .../pulsar/broker/resources/TopicResources.java | 43 ++++ .../broker/resources/TopicResourcesTest.java | 109 ++++++++ .../pulsar/broker/service/PulsarCommandSender.java | 7 +- .../broker/service/PulsarCommandSenderImpl.java | 24 +- .../apache/pulsar/broker/service/ServerCnx.java | 70 +++++- .../pulsar/broker/service/TopicListService.java | 251 +++++++++++++++++++ .../pulsar/broker/service/ServerCnxTest.java | 23 ++ .../broker/service/TopicListServiceTest.java | 132 ++++++++++ .../broker/service/TopicListWatcherTest.java | 102 ++++++++ .../broker/service/utils/ClientChannelHelper.java | 6 + .../apache/pulsar/client/api/ClientErrorsTest.java | 4 +- .../pulsar/client/api/MockBrokerService.java | 2 +- pulsar-client-cpp/lib/Commands.cc | 12 + .../client/impl/BinaryProtoLookupService.java | 5 + .../org/apache/pulsar/client/impl/ClientCnx.java | 71 ++++++ .../pulsar/client/impl/ConnectionHandler.java | 10 +- .../org/apache/pulsar/client/impl/HttpClient.java | 4 + .../pulsar/client/impl/HttpLookupService.java | 5 + .../apache/pulsar/client/impl/LookupService.java | 7 + .../impl/PatternMultiTopicsConsumerImpl.java | 31 ++- .../pulsar/client/impl/PulsarClientImpl.java | 10 + .../pulsar/client/impl/TopicListWatcher.java | 278 +++++++++++++++++++++ .../apache/pulsar/client/impl/ClientCnxTest.java | 109 ++++++++ .../pulsar/client/impl/PulsarClientImplTest.java | 2 +- .../pulsar/client/impl/TopicListWatcherTest.java | 111 ++++++++ .../apache/pulsar/common/protocol/Commands.java | 62 ++++- .../pulsar/common/protocol/PulsarDecoder.java | 42 ++++ .../org/apache/pulsar/common/topics/TopicList.java | 2 + pulsar-common/src/main/proto/PulsarApi.proto | 40 +++ .../pulsar/proxy/server/ProxyConnection.java | 5 +- 30 files changed, 1555 insertions(+), 24 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java index bea67943f82..ee537431280 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java @@ -19,22 +19,35 @@ package org.apache.pulsar.broker.resources; import static org.apache.pulsar.common.util.Codec.decode; +import java.util.EnumSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; public class TopicResources { private static final String MANAGED_LEDGER_PATH = "/managed-ledgers"; private final MetadataStore store; + private final Map<BiConsumer<String, NotificationType>, Pattern> topicListeners; + public TopicResources(MetadataStore store) { this.store = store; + topicListeners = new ConcurrentHashMap<>(); + store.registerListener(this::handleNotification); } public CompletableFuture<List<String>> listPersistentTopicsAsync(NamespaceName ns) { @@ -110,4 +123,34 @@ public class TopicResources { } }); } + + void handleNotification(Notification notification) { + if (notification.getPath().startsWith(MANAGED_LEDGER_PATH) + && EnumSet.of(NotificationType.Created, NotificationType.Deleted).contains(notification.getType())) { + for (Map.Entry<BiConsumer<String, NotificationType>, Pattern> entry : + new HashMap<>(topicListeners).entrySet()) { + Matcher matcher = entry.getValue().matcher(notification.getPath()); + if (matcher.matches()) { + TopicName topicName = TopicName.get( + matcher.group(2), NamespaceName.get(matcher.group(1)), matcher.group(3)); + entry.getKey().accept(topicName.toString(), notification.getType()); + } + } + } + } + + Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) { + return Pattern.compile( + MANAGED_LEDGER_PATH + "/(" + namespaceName + ")/(" + TopicDomain.persistent + ")/(" + "[^/]+)"); + } + + public void registerPersistentTopicListener( + NamespaceName namespaceName, BiConsumer<String, NotificationType> listener) { + topicListeners.put(listener, namespaceNameToTopicNamePattern(namespaceName)); + } + + public void deregisterPersistentTopicListener(BiConsumer<String, NotificationType> listener) { + topicListeners.remove(listener); + } + } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java new file mode 100644 index 00000000000..cfafc5107d2 --- /dev/null +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.resources; + +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import java.util.function.BiConsumer; + +public class TopicResourcesTest { + + private MetadataStore metadataStore; + private TopicResources topicResources; + + @BeforeMethod + public void setup() { + metadataStore = mock(MetadataStore.class); + topicResources = new TopicResources(metadataStore); + } + + @Test + public void testConstructorRegistersAsListener() { + verify(metadataStore).registerListener(any()); + } + + @Test + public void testListenerInvokedWhenTopicCreated() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Created, "/managed-ledgers/tenant/namespace/persistent/topic")); + verify(listener).accept("persistent://tenant/namespace/topic", NotificationType.Created); + } + + @Test + public void testListenerInvokedWhenTopicV1Created() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/cluster/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Created, "/managed-ledgers/tenant/cluster/namespace/persistent/topic")); + verify(listener).accept("persistent://tenant/cluster/namespace/topic", NotificationType.Created); + } + + @Test + public void testListenerInvokedWhenTopicDeleted() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Deleted, "/managed-ledgers/tenant/namespace/persistent/topic")); + verify(listener).accept("persistent://tenant/namespace/topic", NotificationType.Deleted); + } + + @Test + public void testListenerNotInvokedWhenSubscriptionCreated() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Created, "/managed-ledgers/tenant/namespace/persistent/topic/subscription")); + verifyNoInteractions(listener); + } + + @Test + public void testListenerNotInvokedWhenTopicCreatedInOtherNamespace() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Created, "/managed-ledgers/tenant/namespace2/persistent/topic")); + verifyNoInteractions(listener); + } + + @Test + public void testListenerNotInvokedWhenTopicModified() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Modified, "/managed-ledgers/tenant/namespace/persistent/topic")); + verifyNoInteractions(listener); + } + + @Test + public void testListenerNotInvokedAfterDeregistered() { + BiConsumer<String, NotificationType> listener = mock(BiConsumer.class); + topicResources.registerPersistentTopicListener(NamespaceName.get("tenant/namespace"), listener); + topicResources.handleNotification(new Notification(NotificationType.Created, "/managed-ledgers/tenant/namespace/persistent/topic")); + verify(listener).accept("persistent://tenant/namespace/topic", NotificationType.Created); + topicResources.deregisterPersistentTopicListener(listener); + topicResources.handleNotification(new Notification(NotificationType.Created, "/managed-ledgers/tenant/namespace/persistent/topic2")); + verifyNoMoreInteractions(listener); + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java index b267f725986..8934d2e6ce6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java @@ -61,7 +61,7 @@ public interface PulsarCommandSender { void sendGetOrCreateSchemaErrorResponse(long requestId, ServerError error, String errorMessage); - void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize); + void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize, boolean supportsTopicWatchers); void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative, CommandLookupTopicResponse.LookupType response, long requestId, @@ -92,4 +92,9 @@ public interface PulsarCommandSender { void sendEndTxnResponse(long requestId, TxnID txnID, int txnAction); void sendEndTxnErrorResponse(long requestId, TxnID txnID, ServerError error, String message); + + void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics); + + void sendWatchTopicListUpdate(long watcherId, + List<String> newTopics, List<String> deletedTopics, String topicsHash); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 57dcaff250d..5b5af41d1ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -162,8 +162,9 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender { } @Override - public void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize) { - BaseCommand command = Commands.newConnectedCommand(clientProtocolVersion, maxMessageSize); + public void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize, boolean supportsTopicWatchers) { + BaseCommand command = Commands.newConnectedCommand( + clientProtocolVersion, maxMessageSize, supportsTopicWatchers); safeIntercept(command, cnx); ByteBuf outBuf = Commands.serializeWithSize(command); cnx.ctx().writeAndFlush(outBuf, cnx.ctx().voidPromise()); @@ -346,6 +347,25 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender { } } + @Override + public void sendWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, List<String> topics) { + BaseCommand command = Commands.newWatchTopicListSuccess(requestId, watcherId, topicsHash, topics); + interceptAndWriteCommand(command); + } + + @Override + public void sendWatchTopicListUpdate(long watcherId, + List<String> newTopics, List<String> deletedTopics, String topicsHash) { + BaseCommand command = Commands.newWatchTopicUpdate(watcherId, newTopics, deletedTopics, topicsHash); + interceptAndWriteCommand(command); + } + + private void interceptAndWriteCommand(BaseCommand command) { + safeIntercept(command, cnx); + ByteBuf outBuf = Commands.serializeWithSize(command); + cnx.ctx().writeAndFlush(outBuf); + } + private void safeIntercept(BaseCommand command, ServerCnx cnx) { try { this.interceptor.onPulsarCommand(command, cnx); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4f8763330d3..87b6c4208ae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -50,6 +50,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; @@ -119,6 +120,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; import org.apache.pulsar.common.api.proto.CommandUnsubscribe; +import org.apache.pulsar.common.api.proto.CommandWatchTopicList; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; import org.apache.pulsar.common.api.proto.FeatureFlags; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -164,6 +167,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers; private final boolean enableSubscriptionPatternEvaluation; private final int maxSubscriptionPatternLength; + private final TopicListService topicListService; private State state; private volatile boolean isActive = true; private String authRole = null; @@ -272,6 +276,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { this.connectionController = new ConnectionController.DefaultConnectionController(conf); this.enableSubscriptionPatternEvaluation = conf.isEnableBrokerSideSubscriptionPatternEvaluation(); this.maxSubscriptionPatternLength = conf.getSubscriptionPatternMaxLength(); + this.topicListService = new TopicListService(pulsar, this, + enableSubscriptionPatternEvaluation, maxSubscriptionPatternLength); } @Override @@ -334,6 +340,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } } }); + this.topicListService.inactivate(); this.service.getPulsarStats().recordConnectionClose(); } @@ -625,8 +632,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } // complete the connect and sent newConnected command - private void completeConnect(int clientProtoVersion, String clientVersion) { - ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize)); + private void completeConnect(int clientProtoVersion, String clientVersion, boolean supportsTopicWatchers) { + ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize, supportsTopicWatchers)); state = State.Connected; service.getPulsarStats().recordConnectionCreateSuccess(); if (log.isDebugEnabled()) { @@ -685,7 +692,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (state != State.Connected) { // First time authentication is done - completeConnect(clientProtocolVersion, clientVersion); + completeConnect(clientProtocolVersion, clientVersion, enableSubscriptionPatternEvaluation); } else { // If the connection was already ready, it means we're doing a refresh if (!StringUtils.isEmpty(authRole)) { @@ -792,7 +799,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { } if (!service.isAuthenticationEnabled()) { - completeConnect(clientProtocolVersion, clientVersion); + completeConnect(clientProtocolVersion, clientVersion, enableSubscriptionPatternEvaluation); return; } @@ -820,7 +827,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { authRole = getBrokerService().getAuthenticationService().getAnonymousUserRole() .orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured")); - completeConnect(clientProtocolVersion, clientVersion); + completeConnect(clientProtocolVersion, clientVersion, enableSubscriptionPatternEvaluation); return; } @@ -2527,6 +2534,59 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { })); } + protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) { + final long requestId = commandWatchTopicList.getRequestId(); + final long watcherId = commandWatchTopicList.getWatcherId(); + final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace()); + + Pattern topicsPattern = Pattern.compile(commandWatchTopicList.hasTopicsPattern() + ? commandWatchTopicList.getTopicsPattern() : TopicList.ALL_TOPICS_PATTERN); + String topicsHash = commandWatchTopicList.hasTopicsHash() + ? commandWatchTopicList.getTopicsHash() : null; + + final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); + if (lookupSemaphore.tryAcquire()) { + if (invalidOriginalPrincipal(originalPrincipal)) { + final String msg = "Valid Proxy Client role should be provided for watchTopicListRequest "; + log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg, + authRole, originalPrincipal, namespaceName); + commandSender.sendErrorResponse(watcherId, ServerError.AuthorizationError, msg); + lookupSemaphore.release(); + return; + } + isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> { + if (isAuthorized) { + topicListService.handleWatchTopicList(namespaceName, watcherId, requestId, topicsPattern, + topicsHash, lookupSemaphore); + } else { + final String msg = "Proxy Client is not authorized to watchTopicList"; + log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName); + commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg); + lookupSemaphore.release(); + } + return null; + }).exceptionally(ex -> { + logNamespaceNameAuthException(remoteAddress, "watchTopicList", getPrincipal(), + Optional.of(namespaceName), ex); + final String msg = "Exception occurred while trying to handle command WatchTopicList"; + commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg); + lookupSemaphore.release(); + return null; + }); + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed WatchTopicList due to too many lookup-requests {}", remoteAddress, + namespaceName); + } + commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests, + "Failed due to too many pending lookup requests"); + } + } + + protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) { + topicListService.handleWatchTopicListClose(commandWatchTopicListClose); + } + @Override protected boolean isHandshakeCompleted() { return state == State.Connected; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java new file mode 100644 index 00000000000..78ee45223eb --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; +import java.util.regex.Pattern; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.resources.TopicResources; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; +import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; +import org.apache.pulsar.metadata.api.NotificationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicListService { + + + public static class TopicListWatcher implements BiConsumer<String, NotificationType> { + + private final List<String> matchingTopics; + private final TopicListService topicListService; + private final long id; + private final Pattern topicsPattern; + + public TopicListWatcher(TopicListService topicListService, long id, + Pattern topicsPattern, List<String> topics) { + this.topicListService = topicListService; + this.id = id; + this.topicsPattern = topicsPattern; + this.matchingTopics = TopicList.filterTopics(topics, topicsPattern); + } + + public List<String> getMatchingTopics() { + return matchingTopics; + } + + @Override + public void accept(String topicName, NotificationType notificationType) { + if (topicsPattern.matcher(topicName).matches()) { + List<String> newTopics; + List<String> deletedTopics; + if (notificationType == NotificationType.Deleted) { + newTopics = Collections.emptyList(); + deletedTopics = Collections.singletonList(topicName); + matchingTopics.remove(topicName); + } else { + deletedTopics = Collections.emptyList(); + newTopics = Collections.singletonList(topicName); + matchingTopics.add(topicName); + } + String hash = TopicList.calculateHash(matchingTopics); + topicListService.sendTopicListUpdate(id, hash, deletedTopics, newTopics); + } + } + } + + + private static final Logger log = LoggerFactory.getLogger(TopicListService.class); + + private final NamespaceService namespaceService; + private final TopicResources topicResources; + private final ServerCnx connection; + private final boolean enableSubscriptionPatternEvaluation; + private final int maxSubscriptionPatternLength; + private final ConcurrentLongHashMap<CompletableFuture<TopicListWatcher>> watchers; + + + public TopicListService(PulsarService pulsar, ServerCnx connection, + boolean enableSubscriptionPatternEvaluation, int maxSubscriptionPatternLength) { + this.namespaceService = pulsar.getNamespaceService(); + this.connection = connection; + this.enableSubscriptionPatternEvaluation = enableSubscriptionPatternEvaluation; + this.maxSubscriptionPatternLength = maxSubscriptionPatternLength; + this.watchers = ConcurrentLongHashMap.<CompletableFuture<TopicListWatcher>>newBuilder() + .expectedItems(8) + .concurrencyLevel(1) + .build(); + this.topicResources = pulsar.getPulsarResources().getTopicResources(); + } + + public void inactivate() { + for (Long watcherId : new HashSet<>(watchers.keys())) { + deleteTopicListWatcher(watcherId); + } + } + + public void handleWatchTopicList(NamespaceName namespaceName, long watcherId, long requestId, Pattern topicsPattern, + String topicsHash, Semaphore lookupSemaphore) { + + if (!enableSubscriptionPatternEvaluation || topicsPattern.pattern().length() > maxSubscriptionPatternLength) { + String msg = "Unable to create topic list watcher: "; + if (!enableSubscriptionPatternEvaluation) { + msg += "Evaluating subscription patterns is disabled."; + } else { + msg += "Pattern longer than maximum: " + maxSubscriptionPatternLength; + } + log.warn("[{}] {} on namespace {}", connection.getRemoteAddress(), msg, namespaceName); + connection.getCommandSender().sendErrorResponse(requestId, ServerError.NotAllowedError, msg); + lookupSemaphore.release(); + return; + } + CompletableFuture<TopicListWatcher> watcherFuture = new CompletableFuture<>(); + CompletableFuture<TopicListWatcher> existingWatcherFuture = watchers.putIfAbsent(watcherId, watcherFuture); + + if (existingWatcherFuture != null) { + if (existingWatcherFuture.isDone() && !existingWatcherFuture.isCompletedExceptionally()) { + TopicListWatcher watcher = existingWatcherFuture.getNow(null); + log.info("[{}] Watcher with the same id is already created:" + + " watcherId={}, watcher={}", + connection.getRemoteAddress(), watcherId, watcher); + watcherFuture = existingWatcherFuture; + } else { + // There was an early request to create a watcher with the same watcherId. This can happen when + // client timeout is lower the broker timeouts. We need to wait until the previous watcher + // creation request either completes or fails. + log.warn("[{}] Watcher with id is already present on the connection," + + " consumerId={}", connection.getRemoteAddress(), watcherId); + ServerError error; + if (!existingWatcherFuture.isDone()) { + error = ServerError.ServiceNotReady; + } else { + error = ServerError.UnknownError; + watchers.remove(watcherId, existingWatcherFuture); + } + connection.getCommandSender().sendErrorResponse(requestId, error, + "Topic list watcher is already present on the connection"); + lookupSemaphore.release(); + return; + } + } else { + initializeTopicsListWatcher(watcherFuture, namespaceName, watcherId, topicsPattern); + } + + + CompletableFuture<TopicListWatcher> finalWatcherFuture = watcherFuture; + finalWatcherFuture.thenAccept(watcher -> { + List<String> topicList = watcher.getMatchingTopics(); + String hash = TopicList.calculateHash(topicList); + if (hash.equals(topicsHash)) { + topicList = Collections.emptyList(); + } + if (log.isDebugEnabled()) { + log.debug( + "[{}] Received WatchTopicList for namespace [//{}] by {}", + connection.getRemoteAddress(), namespaceName, requestId); + } + connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, hash, topicList); + lookupSemaphore.release(); + }) + .exceptionally(ex -> { + log.warn("[{}] Error WatchTopicList for namespace [//{}] by {}", + connection.getRemoteAddress(), namespaceName, requestId); + connection.getCommandSender().sendErrorResponse(requestId, + BrokerServiceException.getClientErrorCode( + new BrokerServiceException.ServerMetadataException(ex)), ex.getMessage()); + watchers.remove(watcherId, finalWatcherFuture); + lookupSemaphore.release(); + return null; + }); + } + + + public void initializeTopicsListWatcher(CompletableFuture<TopicListWatcher> watcherFuture, + NamespaceName namespace, long watcherId, Pattern topicsPattern) { + namespaceService.getListOfPersistentTopics(namespace). + thenApply(topics -> { + TopicListWatcher watcher = new TopicListWatcher(this, watcherId, topicsPattern, topics); + topicResources.registerPersistentTopicListener(namespace, watcher); + return watcher; + }). + whenComplete((watcher, exception) -> { + if (exception != null) { + watcherFuture.completeExceptionally(exception); + } else { + watcherFuture.complete(watcher); + } + }); + } + + + public void handleWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) { + long requestId = commandWatchTopicListClose.getRequestId(); + long watcherId = commandWatchTopicListClose.getWatcherId(); + deleteTopicListWatcher(watcherId); + connection.getCommandSender().sendWatchTopicListSuccess(requestId, watcherId, null, null); + } + + public void deleteTopicListWatcher(Long watcherId) { + CompletableFuture<TopicListWatcher> watcherFuture = watchers.get(watcherId); + if (watcherFuture == null) { + log.info("[{}] TopicListWatcher was not registered on the connection: {}", + watcherId, connection.getRemoteAddress()); + return; + } + + if (!watcherFuture.isDone() && watcherFuture + .completeExceptionally(new IllegalStateException("Closed watcher before creation was complete"))) { + // We have received a request to close the watcher before it was actually completed, we have marked the + // watcher future as failed and we can tell the client the close operation was successful. When the actual + // create operation will complete, the new watcher will be discarded. + log.info("[{}] Closed watcher before its creation was completed. watcherId={}", + connection.getRemoteAddress(), watcherId); + watchers.remove(watcherId); + return; + } + + if (watcherFuture.isCompletedExceptionally()) { + log.info("[{}] Closed watcher that already failed to be created. watcherId={}", + connection.getRemoteAddress(), watcherId); + watchers.remove(watcherId); + return; + } + + // Proceed with normal watcher close + topicResources.deregisterPersistentTopicListener(watcherFuture.getNow(null)); + watchers.remove(watcherId); + log.info("[{}] Closed watcher, watcherId={}", connection.getRemoteAddress(), watcherId); + } + + public void sendTopicListUpdate(long watcherId, String topicsHash, List<String> deletedTopics, + List<String> newTopics) { + connection.getCommandSender().sendWatchTopicListUpdate(watcherId, newTopics, deletedTopics, topicsHash); + } + + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 1efc2576a2e..dd86d6189d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -102,6 +102,7 @@ import org.apache.pulsar.common.api.proto.CommandSendReceipt; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.CommandSuccess; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.ServerError; @@ -216,6 +217,8 @@ public class ServerCnxTest { doReturn(CompletableFuture.completedFuture(true)).when(namespaceService).checkTopicOwnership(any()); doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfTopics( NamespaceName.get("use", "ns-abc"), CommandGetTopicsOfNamespace.Mode.ALL); + doReturn(CompletableFuture.completedFuture(topics)).when(namespaceService).getListOfPersistentTopics( + NamespaceName.get("use", "ns-abc")); setupMLAsyncCallbackMocks(); @@ -1960,6 +1963,26 @@ public class ServerCnxTest { channel.finish(); } + @Test + public void testWatchTopicList() throws Exception { + svcConfig.setEnableBrokerSideSubscriptionPatternEvaluation(true); + resetChannel(); + setChannelConnected(); + BaseCommand command = Commands.newWatchTopicList(1, 3, "use/ns-abc", "use/ns-abc/topic-.*", null); + ByteBuf serializedCommand = Commands.serializeWithSize(command); + + channel.writeInbound(serializedCommand); + Object resp = getResponse(); + System.out.println(resp); + CommandWatchTopicListSuccess response = (CommandWatchTopicListSuccess) resp; + + assertEquals(response.getTopicsList(), matchingTopics); + assertEquals(response.getTopicsHash(), TopicList.calculateHash(matchingTopics)); + assertEquals(response.getWatcherId(), 3); + + channel.finish(); + } + @Test public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{ // Mock ServerCnx.field: consumers diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java new file mode 100644 index 00000000000..924019f7a54 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.resources.TopicResources; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; +import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.topics.TopicList; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + + +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Semaphore; +import java.util.regex.Pattern; + +public class TopicListServiceTest { + + private TopicListService topicListService; + private ServerCnx connection; + private CompletableFuture<List<String>> topicListFuture; + private Semaphore lookupSemaphore; + private TopicResources topicResources; + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + lookupSemaphore = new Semaphore(1); + lookupSemaphore.acquire(); + topicListFuture = new CompletableFuture<>(); + topicResources = mock(TopicResources.class); + + PulsarService pulsar = mock(PulsarService.class); + when(pulsar.getNamespaceService()).thenReturn(mock(NamespaceService.class)); + when(pulsar.getPulsarResources()).thenReturn(mock(PulsarResources.class)); + when(pulsar.getPulsarResources().getTopicResources()).thenReturn(topicResources); + when(pulsar.getNamespaceService().getListOfPersistentTopics(any())).thenReturn(topicListFuture); + + + connection = mock(ServerCnx.class); + when(connection.getRemoteAddress()).thenReturn(new InetSocketAddress(10000)); + when(connection.getCommandSender()).thenReturn(mock(PulsarCommandSender.class)); + + topicListService = new TopicListService(pulsar, connection, true, 30); + + } + + @Test + public void testCommandWatchSuccessResponse() { + + topicListService.handleWatchTopicList( + NamespaceName.get("tenant/ns"), + 13, + 7, + Pattern.compile("persistent://tenant/ns/topic\\d"), + null, + lookupSemaphore); + List<String> topics = Collections.singletonList("persistent://tenant/ns/topic1"); + String hash = TopicList.calculateHash(topics); + topicListFuture.complete(topics); + Assert.assertEquals(1, lookupSemaphore.availablePermits()); + verify(topicResources).registerPersistentTopicListener( + eq(NamespaceName.get("tenant/ns")), any(TopicListService.TopicListWatcher.class)); + verify(connection.getCommandSender()).sendWatchTopicListSuccess(7, 13, hash, topics); + } + + @Test + public void testCommandWatchErrorResponse() { + topicListService.handleWatchTopicList( + NamespaceName.get("tenant/ns"), + 13, + 7, + Pattern.compile("persistent://tenant/ns/topic\\d"), + null, + lookupSemaphore); + topicListFuture.completeExceptionally(new PulsarServerException("Error")); + Assert.assertEquals(1, lookupSemaphore.availablePermits()); + verifyNoInteractions(topicResources); + verify(connection.getCommandSender()).sendErrorResponse(eq(7L), any(ServerError.class), + eq(PulsarServerException.class.getCanonicalName() + ": Error")); + } + + @Test + public void testCommandWatchTopicListCloseRemovesListener() { + topicListService.handleWatchTopicList( + NamespaceName.get("tenant/ns"), + 13, + 7, + Pattern.compile("persistent://tenant/ns/topic\\d"), + null, + lookupSemaphore); + List<String> topics = Collections.singletonList("persistent://tenant/ns/topic1"); + topicListFuture.complete(topics); + + CommandWatchTopicListClose watchTopicListClose = new CommandWatchTopicListClose() + .setRequestId(8) + .setWatcherId(13); + topicListService.handleWatchTopicListClose(watchTopicListClose); + verify(topicResources).deregisterPersistentTopicListener(any(TopicListService.TopicListWatcher.class)); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java new file mode 100644 index 00000000000..1a2e192d1f5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.common.topics.TopicList; +import org.apache.pulsar.metadata.api.NotificationType; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; + +public class TopicListWatcherTest { + + private static final List<String> INITIAL_TOPIC_LIST = Arrays.asList( + "persistent://tenant/ns/topic1", + "persistent://tenant/ns/topic2", + "persistent://tenant/ns/t3" + ); + + private static final long ID = 7; + private static final Pattern PATTERN = Pattern.compile("persistent://tenant/ns/topic\\d+"); + + + private TopicListService topicListService; + private TopicListService.TopicListWatcher watcher; + + + + @BeforeMethod(alwaysRun = true) + public void setup() { + topicListService = mock(TopicListService.class); + watcher = new TopicListService.TopicListWatcher(topicListService, ID, PATTERN, INITIAL_TOPIC_LIST); + } + + @Test + public void testGetMatchingTopicsReturnsFilteredList() { + Assert.assertEquals( + Arrays.asList("persistent://tenant/ns/topic1", "persistent://tenant/ns/topic2"), + watcher.getMatchingTopics()); + } + + @Test + public void testAcceptSendsNotificationAndRemembersTopic() { + String newTopic = "persistent://tenant/ns/topic3"; + watcher.accept(newTopic, NotificationType.Created); + + List<String> allMatchingTopics = Arrays.asList( + "persistent://tenant/ns/topic1", "persistent://tenant/ns/topic2", newTopic); + String hash = TopicList.calculateHash(allMatchingTopics); + verify(topicListService).sendTopicListUpdate(ID, hash, Collections.emptyList(), + Collections.singletonList(newTopic)); + Assert.assertEquals( + allMatchingTopics, + watcher.getMatchingTopics()); + } + + @Test + public void testAcceptSendsNotificationAndForgetsTopic() { + String deletedTopic = "persistent://tenant/ns/topic1"; + watcher.accept(deletedTopic, NotificationType.Deleted); + + List<String> allMatchingTopics = Collections.singletonList("persistent://tenant/ns/topic2"); + String hash = TopicList.calculateHash(allMatchingTopics); + verify(topicListService).sendTopicListUpdate(ID, hash, + Collections.singletonList(deletedTopic), Collections.emptyList()); + Assert.assertEquals( + allMatchingTopics, + watcher.getMatchingTopics()); + } + + @Test + public void testAcceptIgnoresNonMatching() { + watcher.accept("persistent://tenant/ns/mytopic", NotificationType.Created); + verifyNoInteractions(topicListService); + Assert.assertEquals( + Arrays.asList("persistent://tenant/ns/topic1", "persistent://tenant/ns/topic2"), + watcher.getMatchingTopics()); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index 92f2617cc68..a1782278966 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.utils; import java.util.Queue; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; @@ -156,6 +157,11 @@ public class ClientChannelHelper { protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse response) { queue.offer(new CommandGetTopicsOfNamespaceResponse().copyFrom(response)); } + + @Override + protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess commandWatchTopicListSuccess) { + queue.offer(new CommandWatchTopicListSuccess().copyFrom(commandWatchTopicListSuccess)); + } }; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java index 4353297666a..8c8e4b96add 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java @@ -719,7 +719,7 @@ public class ClientErrorsTest { AtomicBoolean msgSent = new AtomicBoolean(); mockBrokerService.setHandleConnect((ctx, connect) -> { channelCtx.set(ctx); - ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion())); + ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false)); if (numOfConnections.incrementAndGet() == 2) { // close the cnx immediately when trying to connect the 2nd time ctx.channel().close(); @@ -759,7 +759,7 @@ public class ClientErrorsTest { CountDownLatch latch = new CountDownLatch(1); mockBrokerService.setHandleConnect((ctx, connect) -> { channelCtx.set(ctx); - ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion())); + ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false)); if (numOfConnections.incrementAndGet() == 2) { // close the cnx immediately when trying to connect the 2nd time ctx.channel().close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java index b16fac427aa..7cba7405ddb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java @@ -143,7 +143,7 @@ public class MockBrokerService { return; } // default - ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion())); + ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion(), false)); } @Override diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 33134a6c04b..3a00d302e86 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -669,6 +669,18 @@ std::string Commands::messageType(BaseCommand_Type type) { case BaseCommand::TC_CLIENT_CONNECT_RESPONSE: return "TC_CLIENT_CONNECT_RESPONSE"; break; + case BaseCommand::WATCH_TOPIC_LIST: + return "WATCH_TOPIC_LIST"; + break; + case BaseCommand::WATCH_TOPIC_LIST_SUCCESS: + return "WATCH_TOPIC_LIST_SUCCESS"; + break; + case BaseCommand::WATCH_TOPIC_UPDATE: + return "WATCH_TOPIC_UPDATE"; + break; + case BaseCommand::WATCH_TOPIC_LIST_CLOSE: + return "WATCH_TOPIC_LIST_CLOSE"; + break; }; BOOST_THROW_EXCEPTION(std::logic_error("Invalid BaseCommand enumeration value")); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 2106b7a5c6f..18bffba8dc6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -256,6 +256,11 @@ public class BinaryProtoLookupService implements LookupService { return serviceNameResolver.getServiceUrl(); } + @Override + public InetSocketAddress resolveHost() { + return serviceNameResolver.resolveHost(); + } + @Override public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index d52955ccf0b..37464dacfd4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.tuple.Pair; @@ -59,6 +60,7 @@ import org.apache.pulsar.client.impl.schema.SchemaInfoUtil; import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler; import org.apache.pulsar.client.util.TimedCompletableFuture; import org.apache.pulsar.common.api.AuthData; +import org.apache.pulsar.common.api.proto.BaseCommand; import org.apache.pulsar.common.api.proto.CommandAckResponse; import org.apache.pulsar.common.api.proto.CommandActiveConsumerChange; import org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse; @@ -85,6 +87,8 @@ import org.apache.pulsar.common.api.proto.CommandSendError; import org.apache.pulsar.common.api.proto.CommandSendReceipt; import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; +import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.lookup.GetTopicsResult; import org.apache.pulsar.common.protocol.Commands; @@ -128,6 +132,12 @@ public class ClientCnx extends PulsarHandler { .expectedItems(16) .concurrencyLevel(1) .build(); + @Getter(AccessLevel.PACKAGE) + private final ConcurrentLongHashMap<TopicListWatcher> topicListWatchers = + ConcurrentLongHashMap.<TopicListWatcher>newBuilder() + .expectedItems(16) + .concurrencyLevel(1) + .build(); private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>(); private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>(); @@ -160,6 +170,7 @@ public class ClientCnx extends PulsarHandler { @Getter protected AuthenticationDataProvider authenticationDataProvider; private TransactionBufferHandler transactionBufferHandler; + private boolean supportsTopicWatchers; enum State { None, SentConnectFrame, Ready, Failed, Connecting @@ -281,12 +292,14 @@ public class ClientCnx extends PulsarHandler { producers.forEach((id, producer) -> producer.connectionClosed(this)); consumers.forEach((id, consumer) -> consumer.connectionClosed(this)); transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this)); + topicListWatchers.forEach((__, watcher) -> watcher.connectionClosed(this)); pendingRequests.clear(); waitingLookupRequests.clear(); producers.clear(); consumers.clear(); + topicListWatchers.clear(); timeoutTask.cancel(true); } @@ -330,6 +343,10 @@ public class ClientCnx extends PulsarHandler { if (log.isDebugEnabled()) { log.debug("{} Connection is ready", ctx.channel()); } + + supportsTopicWatchers = + connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers(); + // set remote protocol version to the correct version before we complete the connection future setRemoteEndpointProtocolVersion(connected.getProtocolVersion()); connectionFuture.complete(null); @@ -1036,6 +1053,51 @@ public class ClientCnx extends PulsarHandler { return transactionBufferHandler; } + public CompletableFuture<CommandWatchTopicListSuccess> newWatchTopicList( + BaseCommand commandWatchTopicList, long requestId) { + if (!supportsTopicWatchers) { + return FutureUtil.failedFuture( + new PulsarClientException.NotAllowedException( + "Broker does not allow broker side pattern evaluation.")); + } + return sendRequestAndHandleTimeout(Commands.serializeWithSize(commandWatchTopicList), requestId, + RequestType.Command, true); + } + + protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess commandWatchTopicListSuccess) { + checkArgument(state == State.Ready); + + if (log.isDebugEnabled()) { + log.debug("{} Received watchTopicListSuccess response from server: {}", + ctx.channel(), commandWatchTopicListSuccess.getRequestId()); + } + long requestId = commandWatchTopicListSuccess.getRequestId(); + CompletableFuture<CommandWatchTopicListSuccess> requestFuture = + (CompletableFuture<CommandWatchTopicListSuccess>) pendingRequests.remove(requestId); + if (requestFuture != null) { + requestFuture.complete(commandWatchTopicListSuccess); + } else { + log.warn("{} Received unknown request id from server: {}", + ctx.channel(), commandWatchTopicListSuccess.getRequestId()); + } + } + + protected void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate commandWatchTopicUpdate) { + checkArgument(state == State.Ready); + + if (log.isDebugEnabled()) { + log.debug("{} Received watchTopicUpdate command from server: {}", + ctx.channel(), commandWatchTopicUpdate.getWatcherId()); + } + long watcherId = commandWatchTopicUpdate.getWatcherId(); + TopicListWatcher watcher = topicListWatchers.get(watcherId); + if (watcher != null) { + watcher.handleCommandWatchTopicUpdate(commandWatchTopicUpdate); + } else { + log.warn("{} Received topic list update for unknown watcher from server: {}", ctx.channel(), watcherId); + } + } + /** * check serverError and take appropriate action. * <ul> @@ -1082,6 +1144,11 @@ public class ClientCnx extends PulsarHandler { transactionMetaStoreHandlers.put(transactionMetaStoreId, handler); } + void registerTopicListWatcher(final long watcherId, final TopicListWatcher watcher) { + topicListWatchers.put(watcherId, watcher); + + } + public void registerTransactionBufferHandler(final TransactionBufferHandler handler) { transactionBufferHandler = handler; } @@ -1094,6 +1161,10 @@ public class ClientCnx extends PulsarHandler { consumers.remove(consumerId); } + void removeTopicListWatcher(final long watcherId) { + topicListWatchers.remove(watcherId); + } + void setTargetBroker(InetSocketAddress targetBrokerAddress) { this.proxyToTargetBrokerAddress = String.format("%s:%d", targetBrokerAddress.getHostString(), targetBrokerAddress.getPort()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index a951d7b2cb8..6c5a5be200f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -69,8 +70,13 @@ public class ConnectionHandler { } try { - state.client.getConnection(state.topic) // - .thenAccept(cnx -> connection.connectionOpened(cnx)) // + CompletableFuture<ClientCnx> cnxFuture; + if (state.topic == null) { + cnxFuture = state.client.getConnectionToServiceUrl(); + } else { + cnxFuture = state.client.getConnection(state.topic); // + } + cnxFuture.thenAccept(cnx -> connection.connectionOpened(cnx)) // .exceptionally(this::handleConnectionError); } catch (Throwable t) { log.warn("[{}] [{}] Exception thrown while getting connection: ", state.topic, state.getHandlerName(), t); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 82530661be0..61d0c2bd64e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -155,6 +155,10 @@ public class HttpClient implements Closeable { return this.serviceNameResolver.getServiceUrl(); } + public InetSocketAddress resolveHost() { + return serviceNameResolver.resolveHost(); + } + void setServiceUrl(String serviceUrl) throws PulsarClientException { this.serviceNameResolver.updateServiceUrl(serviceUrl); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 6d04c5fab31..def19c45aff 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -123,6 +123,11 @@ public class HttpLookupService implements LookupService { return httpClient.getServiceUrl(); } + @Override + public InetSocketAddress resolveHost() { + return httpClient.resolveHost(); + } + @Override public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode, String topicsPattern, String topicsHash) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index e7d358148c0..c1c93f36c6e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -90,6 +90,13 @@ public interface LookupService extends AutoCloseable { */ String getServiceUrl(); + /** + * Resolves pulsar service url. + * + * @return the service url resolved to a socket address + */ + InetSocketAddress resolveHost(); + /** * Returns all the topics name for a given namespace. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java index 9d00b2c2827..79a7c6a3ae6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.netty.util.Timeout; import io.netty.util.TimerTask; import java.util.ArrayList; @@ -47,6 +48,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T private final Pattern topicsPattern; private final TopicsChangedListener topicsChangeListener; private final Mode subscriptionMode; + private final CompletableFuture<TopicListWatcher> watcherFuture; protected NamespaceName namespaceName; private volatile Timeout recheckPatternTimeout = null; private volatile String topicsHash; @@ -74,6 +76,19 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T this.topicsChangeListener = new PatternTopicsChangedListener(); this.recheckPatternTimeout = client.timer() .newTimeout(this, Math.max(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.SECONDS); + this.watcherFuture = new CompletableFuture<>(); + if (subscriptionMode == Mode.PERSISTENT) { + long watcherId = client.newTopicListWatcherId(); + new TopicListWatcher(topicsChangeListener, client, topicsPattern, watcherId, + namespaceName, topicsHash, watcherFuture); + watcherFuture.exceptionally(ex -> { + log.debug("Unable to create topic list watcher. Falling back to only polling for new topics", ex); + return null; + }); + } else { + log.debug("Not creating topic list watcher for subscription mode {}", subscriptionMode); + watcherFuture.complete(null); + } } public static NamespaceName getNameSpaceFromPattern(Pattern pattern) { @@ -172,7 +187,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> removeFuture.complete(null)) .exceptionally(ex -> { - log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage()); + log.warn("[{}] Failed to unsubscribe from topics: {}", topic, ex.getMessage()); removeFuture.completeExceptionally(ex); return null; }); @@ -194,7 +209,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T FutureUtil.waitForAll(futures) .thenAccept(finalFuture -> addFuture.complete(null)) .exceptionally(ex -> { - log.warn("[{}] Failed to unsubscribe topics: {}", topic, ex.getMessage()); + log.warn("[{}] Failed to subscribe to topics: {}", topic, ex.getMessage()); addFuture.completeExceptionally(ex); return null; }); @@ -203,13 +218,23 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T } @Override + @SuppressFBWarnings public CompletableFuture<Void> closeAsync() { Timeout timeout = recheckPatternTimeout; if (timeout != null) { timeout.cancel(); recheckPatternTimeout = null; } - return super.closeAsync(); + List<CompletableFuture<?>> closeFutures = new ArrayList<>(2); + if (watcherFuture.isDone() && !watcherFuture.isCompletedExceptionally()) { + TopicListWatcher watcher = watcherFuture.getNow(null); + // watcher can be null when subscription mode is not persistent + if (watcher != null) { + closeFutures.add(watcher.closeAsync()); + } + } + closeFutures.add(super.closeAsync()); + return FutureUtil.waitForAll(closeFutures); } @VisibleForTesting diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 2460f4c53f5..2c6339cdb20 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -125,6 +125,7 @@ public class PulsarClientImpl implements PulsarClient { private final AtomicLong producerIdGenerator = new AtomicLong(); private final AtomicLong consumerIdGenerator = new AtomicLong(); + private final AtomicLong topicListWatcherIdGenerator = new AtomicLong(); private final AtomicLong requestIdGenerator = new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); @@ -934,6 +935,11 @@ public class PulsarClientImpl implements PulsarClient { .thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight())); } + public CompletableFuture<ClientCnx> getConnectionToServiceUrl() { + InetSocketAddress address = lookup.resolveHost(); + return getConnection(address, address); + } + public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress logicalAddress, final InetSocketAddress physicalAddress) { return cnxPool.getConnection(logicalAddress, physicalAddress); @@ -956,6 +962,10 @@ public class PulsarClientImpl implements PulsarClient { return consumerIdGenerator.getAndIncrement(); } + long newTopicListWatcherId() { + return topicListWatcherIdGenerator.getAndIncrement(); + } + public long newRequestId() { return requestIdGenerator.getAndIncrement(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java new file mode 100644 index 00000000000..9cd6b003d7d --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.channel.ChannelHandlerContext; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.protocol.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicListWatcher extends HandlerState implements ConnectionHandler.Connection { + + private static final Logger log = LoggerFactory.getLogger(TopicListWatcher.class); + + private static final AtomicLongFieldUpdater<TopicListWatcher> CREATE_WATCHER_DEADLINE_UPDATER = + AtomicLongFieldUpdater + .newUpdater(TopicListWatcher.class, "createWatcherDeadline"); + + private final PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener; + private final String name; + private final ConnectionHandler connectionHandler; + private final Pattern topicsPattern; + private final long watcherId; + private volatile long createWatcherDeadline = 0; + private final NamespaceName namespace; + // TODO maintain the value based on updates from broker and warn the user if inconsistent with hash from polling + private String topicsHash; + private final CompletableFuture<TopicListWatcher> watcherFuture; + + private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<>(); + private final AtomicReference<ClientCnx> clientCnxUsedForWatcherRegistration = new AtomicReference<>(); + + + public TopicListWatcher(PatternMultiTopicsConsumerImpl.TopicsChangedListener topicsChangeListener, + PulsarClientImpl client, Pattern topicsPattern, long watcherId, + NamespaceName namespace, String topicsHash, + CompletableFuture<TopicListWatcher> watcherFuture) { + super(client, null); + this.topicsChangeListener = topicsChangeListener; + this.name = "Watcher(" + topicsPattern + ")"; + this.connectionHandler = new ConnectionHandler(this, + new BackoffBuilder() + .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), + TimeUnit.NANOSECONDS) + .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS) + .setMandatoryStop(0, TimeUnit.MILLISECONDS) + .create(), + this); + this.topicsPattern = topicsPattern; + this.watcherId = watcherId; + this.namespace = namespace; + this.topicsHash = topicsHash; + this.watcherFuture = watcherFuture; + + connectionHandler.grabCnx(); + } + + @Override + public void connectionFailed(PulsarClientException exception) { + boolean nonRetriableError = !PulsarClientException.isRetriableError(exception); + if (nonRetriableError) { + exception.setPreviousExceptions(previousExceptions); + if (watcherFuture.completeExceptionally(exception)) { + setState(State.Failed); + log.info("[{}] Watcher creation failed for {} with non-retriable error {}", + topic, name, exception); + deregisterFromClientCnx(); + } + } else { + previousExceptions.add(exception); + } + } + + @Override + public void connectionOpened(ClientCnx cnx) { + previousExceptions.clear(); + + if (getState() == State.Closing || getState() == State.Closed) { + setState(State.Closed); + deregisterFromClientCnx(); + return; + } + + log.info("[{}][{}] Creating topic list watcher on cnx {}, watcherId {}", + topic, getHandlerName(), cnx.ctx().channel(), watcherId); + + long requestId = client.newRequestId(); + + CREATE_WATCHER_DEADLINE_UPDATER + .compareAndSet(this, 0L, System.currentTimeMillis() + + client.getConfiguration().getOperationTimeoutMs()); + + // synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them + synchronized (this) { + setClientCnx(cnx); + BaseCommand watchRequest = Commands.newWatchTopicList(requestId, watcherId, namespace.toString(), + topicsPattern.pattern(), topicsHash); + + cnx.newWatchTopicList(watchRequest, requestId) + + .thenAccept(response -> { + synchronized (TopicListWatcher.this) { + if (!changeToReadyState()) { + // Watcher was closed while reconnecting, close the connection to make sure the broker + // drops the watcher on its side + setState(State.Closed); + deregisterFromClientCnx(); + cnx.channel().close(); + return; + } + } + + this.connectionHandler.resetBackoff(); + + watcherFuture.complete(this); + + }).exceptionally((e) -> { + deregisterFromClientCnx(); + if (getState() == State.Closing || getState() == State.Closed) { + // Watcher was closed while reconnecting, close the connection to make sure the broker + // drops the watcher on its side + cnx.channel().close(); + return null; + } + log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, + getHandlerName(), cnx.channel().remoteAddress()); + + if (e.getCause() instanceof PulsarClientException + && PulsarClientException.isRetriableError(e.getCause()) + && System.currentTimeMillis() + < CREATE_WATCHER_DEADLINE_UPDATER.get(TopicListWatcher.this)) { + reconnectLater(e.getCause()); + } else if (!watcherFuture.isDone()) { + // unable to create new watcher, fail operation + setState(State.Failed); + watcherFuture.completeExceptionally( + PulsarClientException.wrap(e, String.format("Failed to create topic list watcher %s" + + "when connecting to the broker", getHandlerName()))); + } else { + // watcher was subscribed and connected, but we got some error, keep trying + reconnectLater(e.getCause()); + } + return null; + }); + } + } + + @Override + String getHandlerName() { + return name; + } + + public boolean isConnected() { + return getClientCnx() != null && (getState() == State.Ready); + } + + public ClientCnx getClientCnx() { + return this.connectionHandler.cnx(); + } + + public CompletableFuture<Void> closeAsync() { + + CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + + if (getState() == State.Closing || getState() == State.Closed) { + closeFuture.complete(null); + return closeFuture; + } + + if (!isConnected()) { + log.info("[{}] [{}] Closed watcher (not connected)", topic, getHandlerName()); + setState(State.Closed); + deregisterFromClientCnx(); + closeFuture.complete(null); + return closeFuture; + } + + setState(State.Closing); + + + long requestId = client.newRequestId(); + + ClientCnx cnx = cnx(); + if (null == cnx) { + cleanupAtClose(closeFuture, null); + } else { + BaseCommand cmd = Commands.newWatchTopicListClose(watcherId, requestId); + cnx.sendRequestWithId(Commands.serializeWithSize(cmd), requestId).handle((v, exception) -> { + final ChannelHandlerContext ctx = cnx.ctx(); + boolean ignoreException = ctx == null || !ctx.channel().isActive(); + if (ignoreException && exception != null) { + log.debug("Exception ignored in closing watcher", exception); + } + cleanupAtClose(closeFuture, ignoreException ? null : exception); + return null; + }); + } + + return closeFuture; + } + + // wrapper for connection methods + ClientCnx cnx() { + return this.connectionHandler.cnx(); + } + + public void connectionClosed(ClientCnx clientCnx) { + this.connectionHandler.connectionClosed(clientCnx); + } + + void setClientCnx(ClientCnx clientCnx) { + if (clientCnx != null) { + this.connectionHandler.setClientCnx(clientCnx); + clientCnx.registerTopicListWatcher(watcherId, this); + } + ClientCnx previousClientCnx = clientCnxUsedForWatcherRegistration.getAndSet(clientCnx); + if (previousClientCnx != null && previousClientCnx != clientCnx) { + previousClientCnx.removeTopicListWatcher(watcherId); + } + } + + void deregisterFromClientCnx() { + setClientCnx(null); + } + + void reconnectLater(Throwable exception) { + this.connectionHandler.reconnectLater(exception); + } + + + private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable exception) { + log.info("[{}] Closed topic list watcher", getHandlerName()); + setState(State.Closed); + deregisterFromClientCnx(); + if (exception != null) { + closeFuture.completeExceptionally(exception); + } else { + closeFuture.complete(null); + } + } + + public void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate update) { + List<String> deleted = update.getDeletedTopicsList(); + if (!deleted.isEmpty()) { + topicsChangeListener.onTopicsRemoved(deleted); + } + List<String> added = update.getNewTopicsList(); + if (!added.isEmpty()) { + topicsChangeListener.onTopicsAdded(added); + } + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index 6ce4afecd02..c6eba43fb7a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -20,8 +20,10 @@ package org.apache.pulsar.client.impl; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; @@ -34,12 +36,16 @@ import java.lang.reflect.Field; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; +import java.util.function.Consumer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.BrokerMetadataException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.api.proto.CommandCloseConsumer; import org.apache.pulsar.common.api.proto.CommandCloseProducer; +import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandError; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; +import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.PulsarHandler; @@ -190,4 +196,107 @@ public class ClientCnxTest { eventLoop.shutdownGracefully(); } + + @Test + public void testNoWatchersWhenNoServerSupport() { + withConnection("testNoWatchersWhenNoServerSupport", cnx -> { + cnx.handleConnected(new CommandConnected() + .setServerVersion("Some old Server") + .setProtocolVersion(1)); + + CompletableFuture<CommandWatchTopicListSuccess> result = + cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, "tenant/ns", + ".*", null), 7); + assertTrue(result.isCompletedExceptionally()); + assertFalse(cnx.getTopicListWatchers().containsKey(5)); + }); + } + + @Test + public void testCreateWatcher() { + withConnection("testCreateWatcher", cnx -> { + CommandConnected connected = new CommandConnected() + .setServerVersion("Some strange Server") + .setProtocolVersion(1); + connected.setFeatureFlags().setSupportsTopicWatchers(true); + cnx.handleConnected(connected); + + CompletableFuture<CommandWatchTopicListSuccess> result = + cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, "tenant/ns", + ".*", null), 7); + verify(cnx.ctx()).writeAndFlush(any(ByteBuf.class)); + assertFalse(result.isDone()); + + CommandWatchTopicListSuccess success = new CommandWatchTopicListSuccess() + .setRequestId(7) + .setWatcherId(5).setTopicsHash("f00"); + cnx.handleCommandWatchTopicListSuccess(success); + assertEquals(result.getNow(null), success); + }); + } + + + + @Test + public void testUpdateWatcher() { + withConnection("testUpdateWatcher", cnx -> { + CommandConnected connected = new CommandConnected() + .setServerVersion("Some Strange Server") + .setProtocolVersion(1); + connected.setFeatureFlags().setSupportsTopicWatchers(true); + cnx.handleConnected(connected); + + cnx.newWatchTopicList(Commands.newWatchTopicList(7, 5, "tenant/ns", ".*", null), 7); + + CommandWatchTopicListSuccess success = new CommandWatchTopicListSuccess() + .setRequestId(7) + .setWatcherId(5).setTopicsHash("f00"); + cnx.handleCommandWatchTopicListSuccess(success); + + TopicListWatcher watcher = mock(TopicListWatcher.class); + cnx.registerTopicListWatcher(5, watcher); + + CommandWatchTopicUpdate update = new CommandWatchTopicUpdate() + .setWatcherId(5) + .setTopicsHash("ADD"); + update.addNewTopic("persistent://tenant/ns/topic"); + cnx.handleCommandWatchTopicUpdate(update); + verify(watcher).handleCommandWatchTopicUpdate(update); + }); + } + + private void withConnection(String testName, Consumer<ClientCnx> test) { + ThreadFactory threadFactory = new DefaultThreadFactory(testName); + EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, threadFactory); + try { + + ClientConfigurationData conf = new ClientConfigurationData(); + ClientCnx cnx = new ClientCnx(conf, eventLoop); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + Channel channel = mock(Channel.class); + when(ctx.channel()).thenReturn(channel); + + ChannelFuture listenerFuture = mock(ChannelFuture.class); + when(listenerFuture.addListener(any())).thenReturn(listenerFuture); + when(ctx.writeAndFlush(any())).thenReturn(listenerFuture); + + Field ctxField = PulsarHandler.class.getDeclaredField("ctx"); + ctxField.setAccessible(true); + ctxField.set(cnx, ctx); + + // set connection as SentConnectFrame + Field cnxField = ClientCnx.class.getDeclaredField("state"); + cnxField.setAccessible(true); + cnxField.set(cnx, ClientCnx.State.SentConnectFrame); + + test.accept(cnx); + + } catch (NoSuchFieldException | IllegalAccessException e) { + fail("Error using reflection on ClientCnx", e); + } finally { + eventLoop.shutdownGracefully(); + } + } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index 4c174ff0ac3..4d9977f5478 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -119,7 +119,7 @@ public class PulsarClientImplTest { new GetTopicsResult(Collections.emptyList(), null, false, true))); when(lookup.getPartitionedTopicMetadata(any(TopicName.class))) .thenReturn(CompletableFuture.completedFuture(new PartitionedTopicMetadata())); - when(lookup.getBroker(any(TopicName.class))) + when(lookup.getBroker(any())) .thenReturn(CompletableFuture.completedFuture( Pair.of(mock(InetSocketAddress.class), mock(InetSocketAddress.class)))); ConnectionPool pool = mock(ConnectionPool.class); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java new file mode 100644 index 00000000000..1d245350c82 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.channel.ChannelHandlerContext; +import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl.TopicsChangedListener; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; +import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; +import org.apache.pulsar.common.naming.NamespaceName; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; +import org.mockito.ArgumentCaptor; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; + +public class TopicListWatcherTest { + + private CompletableFuture<ClientCnx> clientCnxFuture; + private TopicListWatcher watcher; + private PulsarClientImpl client; + private CompletableFuture<TopicListWatcher> watcherFuture; + private TopicsChangedListener listener; + + @BeforeMethod(alwaysRun = true) + public void setup() { + listener = mock(TopicsChangedListener.class); + client = mock(PulsarClientImpl.class); + when(client.getConfiguration()).thenReturn(new ClientConfigurationData()); + clientCnxFuture = new CompletableFuture<>(); + when(client.getConnectionToServiceUrl()).thenReturn(clientCnxFuture); + watcherFuture = new CompletableFuture<>(); + watcher = new TopicListWatcher(listener, client, + Pattern.compile("persistent://tenant/ns/topic\\d+"), 7, + NamespaceName.get("tenant/ns"), null, watcherFuture); + } + + @Test + public void testWatcherGrabsConnection() { + verify(client).getConnectionToServiceUrl(); + } + + @Test + public void testWatcherCreatesBrokerSideObjectWhenConnected() { + ClientCnx clientCnx = mock(ClientCnx.class); + CompletableFuture<CommandWatchTopicListSuccess> responseFuture = new CompletableFuture<>(); + ArgumentCaptor<BaseCommand> commandCaptor = ArgumentCaptor.forClass(BaseCommand.class); + when(clientCnx.newWatchTopicList(any(BaseCommand.class), anyLong())).thenReturn(responseFuture); + when(clientCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class)); + clientCnxFuture.complete(clientCnx); + verify(clientCnx).newWatchTopicList(commandCaptor.capture(), anyLong()); + CommandWatchTopicListSuccess success = new CommandWatchTopicListSuccess() + .setWatcherId(7) + .setRequestId(commandCaptor.getValue().getWatchTopicList().getRequestId()) + .setTopicsHash("FEED"); + success.addTopic("persistent://tenant/ns/topic11"); + responseFuture.complete(success); + assertTrue(watcherFuture.isDone() && !watcherFuture.isCompletedExceptionally()); + } + + @Test + public void testWatcherCallsListenerOnUpdate() { + ClientCnx clientCnx = mock(ClientCnx.class); + CompletableFuture<CommandWatchTopicListSuccess> responseFuture = new CompletableFuture<>(); + ArgumentCaptor<BaseCommand> commandCaptor = ArgumentCaptor.forClass(BaseCommand.class); + when(clientCnx.newWatchTopicList(any(BaseCommand.class), anyLong())).thenReturn(responseFuture); + when(clientCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class)); + clientCnxFuture.complete(clientCnx); + verify(clientCnx).newWatchTopicList(commandCaptor.capture(), anyLong()); + CommandWatchTopicListSuccess success = new CommandWatchTopicListSuccess() + .setWatcherId(7) + .setRequestId(commandCaptor.getValue().getWatchTopicList().getRequestId()) + .setTopicsHash("FEED"); + success.addTopic("persistent://tenant/ns/topic11"); + responseFuture.complete(success); + + CommandWatchTopicUpdate update = new CommandWatchTopicUpdate() + .setTopicsHash("F33D") + .setWatcherId(7) + .addAllNewTopics(Collections.singleton("persistent://tenant/ns/topic12")); + + watcher.handleCommandWatchTopicUpdate(update); + verify(listener).onTopicsAdded(Collections.singletonList("persistent://tenant/ns/topic12")); + } + + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 46c22375b77..d8151579353 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -263,11 +263,12 @@ public class Commands { return serializeWithSize(cmd); } - public static ByteBuf newConnected(int clientProtocoVersion) { - return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE); + public static ByteBuf newConnected(int clientProtocoVersion, boolean supportsTopicWatchers) { + return newConnected(clientProtocoVersion, INVALID_MAX_MESSAGE_SIZE, supportsTopicWatchers); } - public static BaseCommand newConnectedCommand(int clientProtocolVersion, int maxMessageSize) { + public static BaseCommand newConnectedCommand(int clientProtocolVersion, int maxMessageSize, + boolean supportsTopicWatchers) { BaseCommand cmd = localCmd(Type.CONNECTED); CommandConnected connected = cmd.setConnected() .setServerVersion("Pulsar Server" + PulsarVersion.getVersion()); @@ -282,11 +283,13 @@ public class Commands { int versionToAdvertise = Math.min(currentProtocolVersion, clientProtocolVersion); connected.setProtocolVersion(versionToAdvertise); + + connected.setFeatureFlags().setSupportsTopicWatchers(supportsTopicWatchers); return cmd; } - public static ByteBuf newConnected(int clientProtocolVersion, int maxMessageSize) { - return serializeWithSize(newConnectedCommand(clientProtocolVersion, maxMessageSize)); + public static ByteBuf newConnected(int clientProtocolVersion, int maxMessageSize, boolean supportsTopicWatchers) { + return serializeWithSize(newConnectedCommand(clientProtocolVersion, maxMessageSize, supportsTopicWatchers)); } public static ByteBuf newAuthChallenge(String authMethod, AuthData brokerData, int clientProtocolVersion) { @@ -1474,6 +1477,55 @@ public class Commands { return serializeWithSize(cmd); } + public static BaseCommand newWatchTopicList( + long requestId, long watcherId, String namespace, String topicsPattern, String topicsHash) { + BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST); + cmd.setWatchTopicList() + .setRequestId(requestId) + .setNamespace(namespace) + .setTopicsPattern(topicsPattern) + .setWatcherId(watcherId); + if (topicsHash != null) { + cmd.getWatchTopicList() + .setTopicsHash(topicsHash); + } + return cmd; + } + + public static BaseCommand newWatchTopicListSuccess(long requestId, long watcherId, String topicsHash, + List<String> topics) { + BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_SUCCESS); + cmd.setWatchTopicListSuccess() + .setRequestId(requestId) + .setWatcherId(watcherId); + if (topicsHash != null) { + cmd.getWatchTopicListSuccess().setTopicsHash(topicsHash); + } + if (topics != null && !topics.isEmpty()) { + cmd.getWatchTopicListSuccess().addAllTopics(topics); + } + return cmd; + } + + public static BaseCommand newWatchTopicUpdate(long watcherId, + List<String> newTopics, List<String> deletedTopics, String topicsHash) { + BaseCommand cmd = localCmd(Type.WATCH_TOPIC_UPDATE); + cmd.setWatchTopicUpdate() + .setWatcherId(watcherId) + .setTopicsHash(topicsHash) + .addAllNewTopics(newTopics) + .addAllDeletedTopics(deletedTopics); + return cmd; + } + + public static BaseCommand newWatchTopicListClose(long watcherId, long requestId) { + BaseCommand cmd = localCmd(Type.WATCH_TOPIC_LIST_CLOSE); + cmd.setWatchTopicListClose() + .setRequestId(requestId) + .setWatcherId(watcherId); + return cmd; + } + public static ByteBuf serializeWithSize(BaseCommand cmd) { // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index 9fb22d1a289..d0b26103461 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -77,6 +77,10 @@ import org.apache.pulsar.common.api.proto.CommandSuccess; import org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest; import org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse; import org.apache.pulsar.common.api.proto.CommandUnsubscribe; +import org.apache.pulsar.common.api.proto.CommandWatchTopicList; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListClose; +import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; +import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.intercept.InterceptException; import org.slf4j.Logger; @@ -432,6 +436,27 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { checkArgument(cmd.hasEndTxnOnSubscriptionResponse()); handleEndTxnOnSubscriptionResponse(cmd.getEndTxnOnSubscriptionResponse()); break; + + case WATCH_TOPIC_LIST: + checkArgument(cmd.hasWatchTopicList()); + handleCommandWatchTopicList(cmd.getWatchTopicList()); + break; + + case WATCH_TOPIC_LIST_SUCCESS: + checkArgument(cmd.hasWatchTopicListSuccess()); + handleCommandWatchTopicListSuccess(cmd.getWatchTopicListSuccess()); + break; + + case WATCH_TOPIC_UPDATE: + checkArgument(cmd.hasWatchTopicUpdate()); + handleCommandWatchTopicUpdate(cmd.getWatchTopicUpdate()); + break; + + case WATCH_TOPIC_LIST_CLOSE: + checkArgument(cmd.hasWatchTopicListClose()); + handleCommandWatchTopicListClose(cmd.getWatchTopicListClose()); + break; + default: break; } @@ -672,5 +697,22 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { throw new UnsupportedOperationException(); } + protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchTopicListSuccess( + CommandWatchTopicListSuccess commandWatchTopicListSuccess) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchTopicUpdate(CommandWatchTopicUpdate commandWatchTopicUpdate) { + throw new UnsupportedOperationException(); + } + + protected void handleCommandWatchTopicListClose(CommandWatchTopicListClose commandWatchTopicListClose) { + throw new UnsupportedOperationException(); + } + private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java index ed373867898..574e4d5bd26 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicList.java @@ -32,6 +32,8 @@ import org.apache.pulsar.common.naming.TopicName; @UtilityClass public class TopicList { + public static final String ALL_TOPICS_PATTERN = ".*"; + private static final String SCHEME_SEPARATOR = "://"; private static final Pattern SCHEME_SEPARATOR_PATTERN = Pattern.compile(Pattern.quote(SCHEME_SEPARATOR)); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index fc8e5a5b7c6..03cc6cc54da 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -294,12 +294,14 @@ message FeatureFlags { optional bool supports_auth_refresh = 1 [default = false]; optional bool supports_broker_entry_metadata = 2 [default = false]; optional bool supports_partial_producer = 3 [default = false]; + optional bool supports_topic_watchers = 4 [default = false]; } message CommandConnected { required string server_version = 1; optional int32 protocol_version = 2 [default = 0]; optional int32 max_message_size = 3; + optional FeatureFlags feature_flags = 4; } message CommandAuthResponse { @@ -757,6 +759,34 @@ message CommandGetTopicsOfNamespaceResponse { optional bool changed = 5 [default = true]; } +message CommandWatchTopicList { + required uint64 request_id = 1; + required uint64 watcher_id = 2; + required string namespace = 3; + required string topics_pattern = 4; + // Only present when the client reconnects: + optional string topics_hash = 5; +} + +message CommandWatchTopicListSuccess { + required uint64 request_id = 1; + required uint64 watcher_id = 2; + repeated string topic = 3; + required string topics_hash = 4; +} + +message CommandWatchTopicUpdate { + required uint64 watcher_id = 1; + repeated string new_topics = 2; + repeated string deleted_topics = 3; + required string topics_hash = 4; +} + +message CommandWatchTopicListClose { + required uint64 request_id = 1; + required uint64 watcher_id = 2; +} + message CommandGetSchema { required uint64 request_id = 1; required string topic = 2; @@ -987,6 +1017,11 @@ message BaseCommand { TC_CLIENT_CONNECT_REQUEST = 62; TC_CLIENT_CONNECT_RESPONSE = 63; + WATCH_TOPIC_LIST = 64; + WATCH_TOPIC_LIST_SUCCESS = 65; + WATCH_TOPIC_UPDATE = 66; + WATCH_TOPIC_LIST_CLOSE = 67; + } @@ -1063,4 +1098,9 @@ message BaseCommand { optional CommandEndTxnOnSubscriptionResponse endTxnOnSubscriptionResponse = 61; optional CommandTcClientConnectRequest tcClientConnectRequest = 62; optional CommandTcClientConnectResponse tcClientConnectResponse = 63; + + optional CommandWatchTopicList watchTopicList = 64; + optional CommandWatchTopicListSuccess watchTopicListSuccess = 65; + optional CommandWatchTopicUpdate watchTopicUpdate = 66; + optional CommandWatchTopicListClose watchTopicListClose = 67; } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index d3c83198773..53c8d1bd418 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -371,7 +371,7 @@ public class ProxyConnection extends PulsarHandler { // partitions metadata lookups state = State.ProxyLookupRequests; lookupProxyHandler = new LookupProxyHandler(service, this); - ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise)) + ctx.writeAndFlush(Commands.newConnected(protocolVersionToAdvertise, false)) .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } } @@ -383,7 +383,8 @@ public class ProxyConnection extends PulsarHandler { state = State.ProxyConnectionToBroker; int maxMessageSize = connected.hasMaxMessageSize() ? connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE; - ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize)) + ctx.writeAndFlush(Commands.newConnected(connected.getProtocolVersion(), maxMessageSize, + connected.hasFeatureFlags() && connected.getFeatureFlags().isSupportsTopicWatchers())) .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); } else { LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "