(pulsar) branch master updated: [fix] [broker] Fix Broker was failing to load stats-internal with broken schema ledger (#22845)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 326e9fa731a [fix] [broker] Fix Broker was failing to load stats-internal with broken schema ledger (#22845) 326e9fa731a is described below commit 326e9fa731ae17304621ab915e36d52a9b28a7a0 Author: Rajan Dhabalia AuthorDate: Wed Jun 5 11:19:12 2024 -0700 [fix] [broker] Fix Broker was failing to load stats-internal with broken schema ledger (#22845) --- .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 7 +++ 1 file changed, 7 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 18e69250c16..2165247b161 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -58,6 +58,8 @@ import java.util.stream.Collectors; import javax.annotation.Nonnull; import lombok.Getter; import lombok.Value; +import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException; +import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsOnMetadataServerException; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -2829,6 +2831,11 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal }).exceptionally(e -> { log.error("[{}] Failed to get ledger metadata for the schema ledger {}", topic, ledgerId, e); +if ((e.getCause() instanceof BKNoSuchLedgerExistsOnMetadataServerException) +|| (e.getCause() instanceof BKNoSuchLedgerExistsException)) { +completableFuture.complete(null); +return null; +} completableFuture.completeExceptionally(e); return null; });
(pulsar) branch master updated: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 59daac64c21 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535) 59daac64c21 is described below commit 59daac64c210f539e733f883edad09d08333aa62 Author: Rajan Dhabalia AuthorDate: Fri Apr 19 10:30:55 2024 -0700 [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535) --- .../pulsar/broker/service/AbstractTopic.java | 52 +- ...kerInternalClientConfigurationOverrideTest.java | 42 - 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e772486fcc6..44a4ca42cea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -220,13 +220,16 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue( data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString(; - topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize()); - topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds()); + topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize())); + topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds())); topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate())); topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled()); topicPolicies.getReplicatorDispatchRate().updateTopicValue( @@ -268,15 +271,19 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters))); topicPolicies.getMaxUnackedMessagesOnConsumer() - .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer)); topicPolicies.getMaxUnackedMessagesOnSubscription() - .updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription); - topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds); - topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic); - topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic); - topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription)); +topicPolicies.getMessageTTLInSeconds() + .updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds)); +topicPolicies.getMaxSubscriptionsPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic)); +topicPolicies.getMaxProducersPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic)); +topicPolicies.getMaxConsumerPerTopic() + .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic)); topicPolicies.getMaxConsumersPerSubscription() - .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription); + .updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription)); topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies); topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue( @@ -312,6 +319,10 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener producer = pulsarClient.newProducer() +.topic(topic).create(); +PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); + assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(), +conf.getMaxUnackedMessagesPerSubscription
(pulsar) branch master updated (cea1a9ba9b5 -> b42d94121c0)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from cea1a9ba9b5 [fix][broker] Fix message drop record in producer stat (#22458) add b42d94121c0 [improve][broker] Recover susbcription creation on the broken schema ledger topic (#22469) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/service/ServerCnx.java| 4 +- .../service/schema/BookkeeperSchemaStorage.java| 2 + .../java/org/apache/pulsar/schema/SchemaTest.java | 76 ++ 3 files changed, 81 insertions(+), 1 deletion(-)
(pulsar) branch master updated: [improve][client] PIP-313 Support force unsubscribe using consumer api (#21687)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 631b13ad23d [improve][client] PIP-313 Support force unsubscribe using consumer api (#21687) 631b13ad23d is described below commit 631b13ad23d7e48c6e82d38f97c23d129062cb7c Author: Rajan Dhabalia AuthorDate: Mon Dec 18 21:23:18 2023 -0800 [improve][client] PIP-313 Support force unsubscribe using consumer api (#21687) Co-authored-by: Jiwe Guo --- .../org/apache/pulsar/broker/service/Consumer.java | 4 +-- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../apache/pulsar/broker/service/Subscription.java | 2 ++ .../nonpersistent/NonPersistentSubscription.java | 17 -- .../service/persistent/PersistentSubscription.java | 20 ++-- .../client/impl/BrokerClientIntegrationTest.java | 37 ++ .../org/apache/pulsar/client/api/Consumer.java | 25 +++ .../pulsar/client/api/PulsarClientException.java | 4 +++ .../apache/pulsar/client/impl/ConsumerBase.java| 14 ++-- .../apache/pulsar/client/impl/ConsumerImpl.java| 6 ++-- .../client/impl/MultiTopicsConsumerImpl.java | 4 +-- .../apache/pulsar/common/protocol/Commands.java| 5 +-- pulsar-common/src/main/proto/PulsarApi.proto | 1 + 13 files changed, 125 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 5ec76d07feb..83dcd8d6c16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -421,8 +421,8 @@ public class Consumer { } } -public void doUnsubscribe(final long requestId) { -subscription.doUnsubscribe(this).thenAccept(v -> { +public void doUnsubscribe(final long requestId, boolean force) { +subscription.doUnsubscribe(this, force).thenAccept(v -> { log.info("Unsubscribed successfully from {}", subscription); cnx.removedConsumer(this); cnx.getCommandSender().sendSuccessResponse(requestId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2baa55b80e7..9f2b98aeb40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1958,7 +1958,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { CompletableFuture consumerFuture = consumers.get(unsubscribe.getConsumerId()); if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) { - consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId()); + consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId(), unsubscribe.isForce()); } else { commandSender.sendErrorResponse(unsubscribe.getRequestId(), ServerError.MetadataError, "Consumer not found"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 6805d197521..61107b7b0db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -75,6 +75,8 @@ public interface Subscription extends MessageExpirer { CompletableFuture doUnsubscribe(Consumer consumer); +CompletableFuture doUnsubscribe(Consumer consumer, boolean forcefully); + CompletableFuture clearBacklog(); CompletableFuture skipMessages(int numMessagesToSkip); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 28ea9f39ac8..92aba6221da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -429,11 +429,24 @@ public class NonPersistentSubscription extends AbstractSubscription implements S */ @Override public CompletableFuture doUnsubscribe(Consumer consumer) { +return doUnsubscribe(consumer, false); +} + +/** + * Handle unsubscribe command from the client API Check with the dispatcher is this con
(pulsar-client-cpp) branch main updated: [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git The following commit(s) were added to refs/heads/main by this push: new 25ea451 [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373) 25ea451 is described below commit 25ea451eb3a79d48966689ec64460ae03d5d57da Author: Rajan Dhabalia AuthorDate: Mon Dec 18 19:36:12 2023 -0800 [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373) * [PIP-60] [Proxy-Server] Support SNI routing for Pulsar CPP client * fix format * fix const def * Fix format check - Co-authored-by: Yunze Xu --- include/pulsar/ClientConfiguration.h | 31 +++ lib/ClientConfiguration.cc | 16 lib/ClientConfigurationImpl.h| 2 ++ lib/ClientConnection.cc | 20 lib/ClientConnection.h | 5 + tests/ConsumerTest.cc| 12 6 files changed, 82 insertions(+), 4 deletions(-) diff --git a/include/pulsar/ClientConfiguration.h b/include/pulsar/ClientConfiguration.h index 3d651e9..cc3c3ed 100644 --- a/include/pulsar/ClientConfiguration.h +++ b/include/pulsar/ClientConfiguration.h @@ -32,6 +32,10 @@ class PULSAR_PUBLIC ClientConfiguration { ~ClientConfiguration(); ClientConfiguration(const ClientConfiguration&); ClientConfiguration& operator=(const ClientConfiguration&); +enum ProxyProtocol +{ +SNI = 0 +}; /** * Configure a limit on the amount of memory that will be allocated by this client instance. @@ -320,6 +324,33 @@ class PULSAR_PUBLIC ClientConfiguration { */ ClientConfiguration& setConnectionTimeout(int timeoutMs); +/** + * Set proxy-service url when client would like to connect to broker via proxy. Client must configure both + * proxyServiceUrl and appropriate proxyProtocol. + * + * Example: pulsar+ssl://ats-proxy.example.com:4443 + * + * @param proxyServiceUrl proxy url to connect with broker + * @return + */ +ClientConfiguration& setProxyServiceUrl(const std::string& proxyServiceUrl); + +const std::string& getProxyServiceUrl() const; + +/** + * Set appropriate proxy-protocol along with proxy-service url. Currently Pulsar supports SNI proxy + * routing. + * + * SNI routing: + * https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing. + * + * @param proxyProtocol possible options (SNI) + * @return + */ +ClientConfiguration& setProxyProtocol(ProxyProtocol proxyProtocol); + +ProxyProtocol getProxyProtocol() const; + /** * The getter associated with setConnectionTimeout(). */ diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc index 63c0bf8..6e7c745 100644 --- a/lib/ClientConfiguration.cc +++ b/lib/ClientConfiguration.cc @@ -134,6 +134,22 @@ ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurr return *this; } +ClientConfiguration& ClientConfiguration::setProxyServiceUrl(const std::string& proxyServiceUrl) { +impl_->proxyServiceUrl = proxyServiceUrl; +return *this; +} + +const std::string& ClientConfiguration::getProxyServiceUrl() const { return impl_->proxyServiceUrl; } + +ClientConfiguration& ClientConfiguration::setProxyProtocol(ClientConfiguration::ProxyProtocol proxyProtocol) { +impl_->proxyProtocol = proxyProtocol; +return *this; +} + +ClientConfiguration::ProxyProtocol ClientConfiguration::getProxyProtocol() const { +return impl_->proxyProtocol; +} + int ClientConfiguration::getConcurrentLookupRequest() const { return impl_->concurrentLookupRequest; } ClientConfiguration& ClientConfiguration::setMaxLookupRedirects(int maxLookupRedirects) { diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h index 3458a05..b62b97c 100644 --- a/lib/ClientConfigurationImpl.h +++ b/lib/ClientConfigurationImpl.h @@ -46,6 +46,8 @@ struct ClientConfigurationImpl { std::string listenerName; int connectionTimeoutMs{1}; // 10 seconds std::string description; +std::string proxyServiceUrl; +ClientConfiguration::ProxyProtocol proxyProtocol; std::unique_ptr takeLogger() { return std::move(loggerFactory); } }; diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc index 97d8847..61aa7f7 100644 --- a/lib/ClientConnection.cc +++ b/lib/ClientConnection.cc @@ -209,7 +209,15 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std: boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client); #endif Url serv
(pulsar) branch master updated: [fix][broker] Fix failure while creating non-durable cursor with inactive managed-ledger (#21508)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 720d6d24327 [fix][broker] Fix failure while creating non-durable cursor with inactive managed-ledger (#21508) 720d6d24327 is described below commit 720d6d24327a98d974bcf90eb248037572ab39e3 Author: Rajan Dhabalia AuthorDate: Sat Nov 4 09:40:31 2023 -0700 [fix][broker] Fix failure while creating non-durable cursor with inactive managed-ledger (#21508) --- .../apache/bookkeeper/mledger/ManagedLedger.java | 4 +++- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 5 - .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 24 ++ .../offload/jcloud/impl/MockManagedLedger.java | 4 ++-- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index c7dd8ea9129..f91d9ec3f5a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -682,8 +682,10 @@ public interface ManagedLedger { /** * Check current inactive ledger (based on {@link ManagedLedgerConfig#getInactiveLedgerRollOverTimeMs()} and * roll over that ledger if inactive. + * + * @return true if ledger is considered for rolling over */ -void checkInactiveLedgerAndRollOver(); +boolean checkInactiveLedgerAndRollOver(); /** * Check if managed ledger should cache backlog reads. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e349bf50808..f3e3a9dc144 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -4455,7 +4455,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } @Override -public void checkInactiveLedgerAndRollOver() { +public boolean checkInactiveLedgerAndRollOver() { long currentTimeMs = System.currentTimeMillis(); if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > (lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) { log.info("[{}] Closing inactive ledger, last-add entry {}", name, lastAddEntryTimeMs); @@ -4476,10 +4476,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } ledgerClosed(lh); +createLedgerAfterClosed(); // we do not create ledger here, since topic is inactive for a long time. }, null); +return true; } } +return false; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 5bd6c299d99..6e04aafec0f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -124,6 +124,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; @@ -4076,4 +4077,27 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { }); future.join(); } + +@Test +public void testNonDurableCursorCreateForInactiveLedger() throws Exception { +String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut"; +BookKeeper spyBookKeeper = spy(bkc); +ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper); +ManagedLedgerConfig config = new ManagedLedgerConfig(); +config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS); +ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, config); + +MutableBoolean isRolledOver = new MutableBoolean(false); +retryStrategically((test) -> { +if (isRolledOver.booleanValue()) { +return true; +
(pulsar-adapters) branch master updated: Add Strom adapter back after removing from Apache Storm repo (#55)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git The following commit(s) were added to refs/heads/master by this push: new 5aeddbb Add Strom adapter back after removing from Apache Storm repo (#55) 5aeddbb is described below commit 5aeddbb21508fa70d36199f95dabebe17774384c Author: Rajan Dhabalia AuthorDate: Wed Nov 1 10:24:57 2023 -0700 Add Strom adapter back after removing from Apache Storm repo (#55) This reverts commit ab537c13061b06de2044cd43965de930d762fe8a. --- .asf.yaml | 3 +- README.md | 4 +- pom.xml| 19 + pulsar-storm/pom.xml | 102 + .../apache/pulsar/storm/MessageToValuesMapper.java | 44 ++ .../java/org/apache/pulsar/storm/PulsarBolt.java | 207 + .../pulsar/storm/PulsarBoltConfiguration.java | 57 +++ .../java/org/apache/pulsar/storm/PulsarSpout.java | 494 + .../pulsar/storm/PulsarSpoutConfiguration.java | 195 .../apache/pulsar/storm/PulsarSpoutConsumer.java | 58 +++ .../pulsar/storm/PulsarStormConfiguration.java | 90 .../java/org/apache/pulsar/storm/PulsarTuple.java | 45 ++ .../apache/pulsar/storm/SharedPulsarClient.java| 155 +++ .../apache/pulsar/storm/TupleToMessageMapper.java | 66 +++ pulsar-storm/src/main/javadoc/overview.html| 29 ++ .../org/apache/pulsar/storm/PulsarSpoutTest.java | 178 tests/pom.xml | 1 + tests/pulsar-storm-test/pom.xml| 131 ++ .../apache/pulsar/storm/MockOutputCollector.java | 101 + .../pulsar/storm/MockSpoutOutputCollector.java | 80 .../org/apache/pulsar/storm/PulsarBoltTest.java| 236 ++ .../org/apache/pulsar/storm/PulsarSpoutTest.java | 349 +++ .../java/org/apache/pulsar/storm/TestUtil.java | 35 ++ .../apache/pulsar/storm/example/StormExample.java | 166 +++ 24 files changed, 2841 insertions(+), 4 deletions(-) diff --git a/.asf.yaml b/.asf.yaml index c04e151..74589a2 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -27,6 +27,7 @@ github: - streaming - queuing - event-streaming +- apache-storm - apache-spark - apache-kafka features: @@ -47,4 +48,4 @@ github: notifications: commits: commits@pulsar.apache.org issues: commits@pulsar.apache.org - pullrequests: commits@pulsar.apache.org + pullrequests: commits@pulsar.apache.org \ No newline at end of file diff --git a/README.md b/README.md index 33409df..ce62338 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,6 @@ This repository is used for hosting all the adapters maintained and supported by [Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is supported and maintained by Apache Flink Community. -[Apache Storm bolt and spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are supported by Apache Storm Community. - ## Building In order to build this code you can simply use Maven @@ -44,5 +42,5 @@ git checkout v2.11.0 mvn clean install -DskipTests ``` -This is because this repository depends on test integration artifacts of the relative version on the main +This is because this repository depends on test integration artifacts of the relative version on the main Apache Pulsar codebase diff --git a/pom.xml b/pom.xml index 5f39f48..4f240d1 100644 --- a/pom.xml +++ b/pom.xml @@ -78,6 +78,7 @@ 2.11.0 2.7.2 +2.0.0 0.8.1.1 1.10.2 1.2.17 @@ -139,6 +140,7 @@ +pulsar-storm pulsar-spark pulsar-client-kafka-compat pulsar-log4j2-appender @@ -250,6 +252,22 @@ + +org.apache.storm +storm-client +${storm.version} + + +org.apache.storm +storm-server +${storm.version} + + +org.apache.storm +storm-core +${storm.version} + + org.apache.kafka kafka_2.9.2 @@ -1068,3 +1086,4 @@ + diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml new file mode 100644 index 000..a20649b --- /dev/null +++ b/pulsar-storm/pom.xml @@ -0,0 +1,102 @@ + +http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; + xmlns="http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;> + 4.0.0 + + +org.apache.pulsar +pulsar-adapters +2.11.0-SNAPSHOT +.. + + + pulsar-storm + Pulsar Storm adapter + + + + + org.apache.pulsar + pulsar-common + test + + + + org.apache.pulsar + pulsar-client + + + + org.slf4j + slf4j-api + + + + org.mockito +
[pulsar] branch master updated: [fix][broker]Create v1/namespace with Policies (#21171)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2b5c199053a [fix][broker]Create v1/namespace with Policies (#21171) 2b5c199053a is described below commit 2b5c199053a5b2d7f849e6604d619bae9197a8c9 Author: vraulji567 <95091480+vraulji...@users.noreply.github.com> AuthorDate: Wed Oct 25 22:16:00 2023 -0400 [fix][broker]Create v1/namespace with Policies (#21171) Co-authored-by: vraulji --- .../pulsar/broker/admin/impl/NamespacesBase.java | 11 .../apache/pulsar/broker/admin/v1/Namespaces.java | 36 .../apache/pulsar/broker/admin/v2/Namespaces.java | 15 - .../apache/pulsar/broker/admin/NamespacesTest.java | 64 ++ .../client/admin/internal/NamespacesImpl.java | 5 +- 5 files changed, 112 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bd3690d3c74..7be1ebf3dd1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2656,4 +2656,15 @@ public abstract class NamespacesBase extends AdminResource { throw new RestException(e); } } + +protected Policies getDefaultPolicesIfNull(Policies policies) { +if (policies == null) { +policies = new Policies(); +} +int defaultNumberOfBundles = config().getDefaultNumberOfNamespaceBundles(); +if (policies.bundles == null) { +policies.bundles = getBundles(defaultNumberOfBundles); +} +return policies; +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index b188750252a..9cf394e77f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1722,5 +1722,41 @@ public class Namespaces extends NamespacesBase { internalEnableMigration(migrated); } +@PUT +@Path("/{property}/{cluster}/{namespace}/policy") +@ApiOperation(value = "Creates a new namespace with the specified policies") +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"), +@ApiResponse(code = 409, message = "Namespace already exists"), +@ApiResponse(code = 412, message = "Namespace name is not valid") }) +public void createNamespace(@Suspended AsyncResponse response, +@PathParam("property") String property, +@PathParam("cluster") String cluster, +@PathParam("namespace") String namespace, +@ApiParam(value = "Policies for the namespace") Policies policies) { +validateNamespaceName(property, cluster, namespace); +CompletableFuture ret; +if (!namespaceName.isGlobal()) { +// If the namespace is non global, make sure property has the access on the cluster. For global namespace, +// same check is made at the time of setting replication. +ret = validateClusterForTenantAsync(namespaceName.getTenant(), namespaceName.getCluster()); +} else { +ret = CompletableFuture.completedFuture(null); +} + +ret.thenApply(__ -> getDefaultPolicesIfNull(policies)).thenCompose(this::internalCreateNamespace) +.thenAccept(__ -> response.resume(Response.noContent().build())) +.exceptionally(ex -> { +Throwable root = FutureUtil.unwrapCompletionException(ex); +if (root instanceof MetadataStoreException.AlreadyExistsException) { +response.resume(new RestException(Status.CONFLICT, "Namespace already exists")); +} else { +log.error("[{}] Failed to create namespace {}", clientAppId(), namespaceName, ex); +resumeAsyncResponseExceptionally(response, ex); +} +return null; +}); +} + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/p
[pulsar] branch master updated: [feat] [broker] PIP-188 Fix cluster migration state store into local namespace policies (#21363)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3a9f99f03db [feat] [broker] PIP-188 Fix cluster migration state store into local namespace policies (#21363) 3a9f99f03db is described below commit 3a9f99f03db175ab0622dbfc87cde9efab24276e Author: Rajan Dhabalia AuthorDate: Wed Oct 25 14:26:20 2023 -0700 [feat] [broker] PIP-188 Fix cluster migration state store into local namespace policies (#21363) Co-authored-by: Rajan Dhabalia --- .../apache/pulsar/broker/admin/AdminResource.java | 50 +++--- .../pulsar/broker/admin/impl/NamespacesBase.java | 4 +- .../pulsar/broker/service/AbstractTopic.java | 6 +-- .../broker/service/ClusterMigrationTest.java | 1 + .../pulsar/common/policies/data/Policies.java | 5 ++- .../pulsar/common/policies/data/LocalPolicies.java | 1 + 6 files changed, 36 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index e9beab90b5f..1526ae18a90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -57,6 +57,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.EntryFilters; +import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; @@ -304,7 +305,9 @@ public abstract class AdminResource extends PulsarWebResource { // fetch bundles from LocalZK-policies BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory() .getBundles(namespaceName).getBundlesData(); +Optional localPolicies = getLocalPolicies().getLocalPolicies(namespaceName); policies.bundles = bundleData != null ? bundleData : policies.bundles; +policies.migrated = localPolicies.isPresent() ? localPolicies.get().migrated : false; if (policies.is_allow_auto_update_schema == null) { // the type changed from boolean to Boolean. return broker value here for keeping compatibility. policies.is_allow_auto_update_schema = pulsar().getConfig().isAllowAutoUpdateSchemaEnabled(); @@ -321,32 +324,31 @@ public abstract class AdminResource extends PulsarWebResource { } protected CompletableFuture getNamespacePoliciesAsync(NamespaceName namespaceName) { -return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> { -if (policies.isPresent()) { -return pulsar() -.getNamespaceService() -.getNamespaceBundleFactory() -.getBundlesAsync(namespaceName) -.thenCompose(bundles -> { -BundlesData bundleData = null; -try { -bundleData = bundles.getBundlesData(); -} catch (Exception e) { -log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e); -return FutureUtil.failedFuture(new RestException(e)); -} -policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles; -if (policies.get().is_allow_auto_update_schema == null) { -// the type changed from boolean to Boolean. return broker value here for keeping compatibility. -policies.get().is_allow_auto_update_schema = pulsar().getConfig() -.isAllowAutoUpdateSchemaEnabled(); +CompletableFuture result = new CompletableFuture<>(); +namespaceResources().getPoliciesAsync(namespaceName) + .thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> { +if (pl.isPresent()) { +Policies policies = pl.get(); +if (localPolicies.isPresent()) { +policies.bundles = localPolicies.get().bundles; +policies.migrated = localPolicies.get().migrated; +} +if (policies.is_allow_a
[pulsar] branch master updated: [feat] [broker] PIP-188 Fix cluster migration state store into local metadatastore (#21359)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 04d1225fbb1 [feat] [broker] PIP-188 Fix cluster migration state store into local metadatastore (#21359) 04d1225fbb1 is described below commit 04d1225fbb1485333f44138e587aadce34ea1f0e Author: Rajan Dhabalia AuthorDate: Tue Oct 24 23:59:09 2023 -0700 [feat] [broker] PIP-188 Fix cluster migration state store into local metadatastore (#21359) Co-authored-by: Rajan Dhabalia --- .../pulsar/broker/resources/ClusterResources.java | 40 ++- .../pulsar/broker/resources/PulsarResources.java | 3 +- .../pulsar/broker/admin/impl/ClustersBase.java | 44 ++- .../pulsar/broker/service/AbstractTopic.java | 31 +++-- .../org/apache/pulsar/broker/service/Consumer.java | 2 +- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../service/nonpersistent/NonPersistentTopic.java | 2 +- .../broker/service/persistent/PersistentTopic.java | 2 +- .../broker/service/ClusterMigrationTest.java | 128 + .../org/apache/pulsar/client/admin/Clusters.java | 43 ++- .../pulsar/common/policies/data/ClusterData.java | 26 - .../common/policies/data/ClusterPolicies.java | 61 ++ .../pulsar/client/admin/internal/ClustersImpl.java | 16 ++- .../org/apache/pulsar/admin/cli/CmdClusters.java | 13 ++- .../common/policies/data/ClusterDataImpl.java | 31 + .../common/policies/data/ClusterPoliciesImpl.java | 85 ++ .../common/policies/data/ClusterDataImplTest.java | 3 - 18 files changed, 380 insertions(+), 154 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 843cec7b205..b0cc50edf1f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import lombok.Getter; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStore; @@ -39,10 +40,19 @@ public class ClusterResources extends BaseResources { @Getter private FailureDomainResources failureDomainResources; - -public ClusterResources(MetadataStore store, int operationTimeoutSec) { -super(store, ClusterData.class, operationTimeoutSec); -this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec); +@Getter +private ClusterPoliciesResources clusterPoliciesResources; + +public ClusterResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) { +super(configurationStore, ClusterData.class, operationTimeoutSec); +this.failureDomainResources = new FailureDomainResources(configurationStore, FailureDomainImpl.class, +operationTimeoutSec); +if (localStore != null) { +this.clusterPoliciesResources = new ClusterPoliciesResources(localStore, ClusterPoliciesImpl.class, +operationTimeoutSec); +} else { +this.clusterPoliciesResources = null; +} } public CompletableFuture> listAsync() { @@ -216,4 +226,26 @@ public class ClusterResources extends BaseResources { }); } } + +public static class ClusterPoliciesResources extends BaseResources { +public static final String LOCAL_POLICIES_PATH = "policies"; + +public ClusterPoliciesResources(MetadataStore store, Class clazz, +int operationTimeoutSec) { +super(store, clazz, operationTimeoutSec); +} + +public Optional getClusterPolicies(String clusterName) throws MetadataStoreException { +return get(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH)); +} + +public CompletableFuture> getClusterPoliciesAsync(String clusterName) { +return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH)); +} + +public CompletableFuture setPoliciesWithCreateAsync(String clusterName, +Function, ClusterPoliciesImpl> createFunction) { +return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, clusterName,
[pulsar] branch master updated: [fix][broker] Allow broker deployment in heterogeneous hw config cluster without restricting nic speed detection (#21409)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 789d284d6a0 [fix][broker] Allow broker deployment in heterogeneous hw config cluster without restricting nic speed detection (#21409) 789d284d6a0 is described below commit 789d284d6a073e61abac27da875f38df27c5217d Author: Rajan Dhabalia AuthorDate: Tue Oct 24 16:14:44 2023 -0700 [fix][broker] Allow broker deployment in heterogeneous hw config cluster without restricting nic speed detection (#21409) --- .../src/main/java/org/apache/pulsar/broker/PulsarService.java| 9 - .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java| 8 .../tests/integration/loadbalance/ExtensibleLoadManagerTest.java | 5 - 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e2950594047..9a202c28c2a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -83,7 +83,6 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.intercept.BrokerInterceptors; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; -import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; @@ -733,14 +732,6 @@ public class PulsarService implements AutoCloseable, ShutdownService { config.getDefaultRetentionTimeInMinutes() * 60)); } -if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty() -&& config.isLoadBalancerEnabled() -&& LinuxInfoUtils.isLinux() -&& !LinuxInfoUtils.checkHasNicSpeeds()) { -throw new IllegalStateException("Unable to read VM NIC speed. You must set " -+ "[loadBalancerOverrideBrokerNicSpeedGbps] to override it when load balancer is enabled."); -} - localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic()) ? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic()) : null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 20ba9500cb1..c7cd55e183b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -256,6 +256,14 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { TopicName topicName = TopicName.get(defaultTestNamespace + "/test-check-ownership"); NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); // 1. The bundle is never assigned. +retryStrategically((test) -> { +try { +return !primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get() +&& !secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get(); +} catch (Exception e) { +return false; +} +}, 5, 200); assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 035d88c7be0..158827ca34d 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.tests.integration.loadbalance; import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT; +import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; import
[pulsar] branch master updated: [fix][broker] Fix race condition of replication cluster connection during migration topic (#21364)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 689976bf3ea [fix][broker] Fix race condition of replication cluster connection during migration topic (#21364) 689976bf3ea is described below commit 689976bf3ea8dc4cff6f4fdcab54bdd4a8a73a7e Author: Rajan Dhabalia AuthorDate: Mon Oct 16 14:05:13 2023 -0700 [fix][broker] Fix race condition of replication cluster connection during migration topic (#21364) Co-authored-by: Rajan Dhabalia --- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../org/apache/pulsar/broker/service/Topic.java| 2 ++ .../service/nonpersistent/NonPersistentTopic.java | 5 +++ .../broker/service/persistent/PersistentTopic.java | 37 -- .../broker/service/ClusterMigrationTest.java | 9 -- 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index efbe3fcd7ec..95f139dc11e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1680,7 +1680,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) { Optional clusterURL = getMigratedClusterUrl(service.getPulsar(), topic.getName()); if (clusterURL.isPresent()) { -if (topic.isReplicationBacklogExist()) { +if (!topic.shouldProducerMigrate()) { log.info("Topic {} is migrated but replication backlog exist: " + "producerId = {}, producerName = {}, {}", topicName, producerId, producerName, ex.getCause().getMessage()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7657d77e129..c697639ff4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -234,6 +234,8 @@ public interface Topic { boolean isBrokerPublishRateExceeded(); +boolean shouldProducerMigrate(); + boolean isReplicationBacklogExist(); void disableCnxAutoRead(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 54811da7238..76e9f261ca6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -235,6 +235,11 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol // No-op } +@Override +public boolean shouldProducerMigrate() { +return true; +} + @Override public boolean isReplicationBacklogExist() { return false; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1ed7bafc147..03ee0f06e2f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -660,8 +660,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal List> futures = new ArrayList<>(); // send migration url metadata to producers before disconnecting them if (isMigrated()) { -if (isReplicationBacklogExist()) { -log.info("Topic {} is migrated but replication backlog exists. Closing producers.", topic); +if (!shouldProducerMigrate()) { +log.info("Topic {} is migrated but replication-backlog exists or " ++ "subs not created. Closing producers.", topic); } else { producers.forEach((__, producer) -> producer.topicMigrated(getMigratedClusterUrl())); } @@ -2592,6 +2593,20 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (!clusterUrl.isPresent()
[pulsar] branch master updated: [fix][broker] Fix avoid creating new topic after migration is started (#21368)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e08841114ce [fix][broker] Fix avoid creating new topic after migration is started (#21368) e08841114ce is described below commit e08841114cece7e4dc64677505ab66271a2bace5 Author: Rajan Dhabalia AuthorDate: Mon Oct 16 11:26:29 2023 -0700 [fix][broker] Fix avoid creating new topic after migration is started (#21368) Co-authored-by: Rajan Dhabalia --- .../pulsar/broker/service/AbstractTopic.java | 5 +++ .../pulsar/broker/service/BrokerService.java | 26 ++- .../apache/pulsar/broker/service/ServerCnx.java| 51 +++--- .../broker/service/ClusterMigrationTest.java | 43 +++--- 4 files changed, 114 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 3cb396d7a4b..a8f25f61a94 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1352,6 +1352,11 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener isClusterMigrationEnabled(PulsarService pulsar, +String topic) { +return getMigratedClusterUrlAsync(pulsar, topic).thenApply(url -> url.isPresent()); +} + public static CompletableFuture> getMigratedClusterUrlAsync(PulsarService pulsar, String topic) { return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d03a94a0563..b85f77cb2f5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -111,6 +111,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException; import org.apache.pulsar.broker.service.TopicEventsListener.EventStage; import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; @@ -1521,6 +1522,12 @@ public class BrokerService implements Closeable { topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); +// do not recreate topic if topic is already migrated and deleted by broker +// so, avoid creating a new topic if migration is already started +if (ex != null && (ex.getCause() instanceof TopicMigratedException)) { + topicFuture.completeExceptionally(ex.getCause()); +return null; +} createPendingLoadTopic(); return null; }); @@ -1632,7 +1639,10 @@ public class BrokerService implements Closeable { ? checkMaxTopicsPerNamespace(topicName, 1) : CompletableFuture.completedFuture(null); -maxTopicsCheck.thenCompose(__ -> getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> { +CompletableFuture isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName); + +maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated).thenCompose(__ -> getManagedLedgerConfig(topicName)) +.thenAccept(managedLedgerConfig -> { if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { // init managedLedger interceptor Set interceptors = new HashSet<>(); @@ -1760,6 +1770,20 @@ public class BrokerService implements Closeable { }); } +private CompletableFuture checkTopicAlreadyMigrated(TopicName topicName) { +CompletableFuture result = new CompletableFuture<>(); +AbstractTopic.isClusterMigrationEnabled(pulsar,
[pulsar] branch master updated: [fix][broker]Support to migrate topics from blue to green cluster per namespace (#21367)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a10564d875f [fix][broker]Support to migrate topics from blue to green cluster per namespace (#21367) a10564d875f is described below commit a10564d875f42c32fda0178cfe7ad976d26fbfb1 Author: vraulji567 <95091480+vraulji...@users.noreply.github.com> AuthorDate: Sun Oct 15 20:32:42 2023 -0400 [fix][broker]Support to migrate topics from blue to green cluster per namespace (#21367) Co-authored-by: Vishwadeepsinh Raulji --- .../pulsar/broker/admin/impl/NamespacesBase.java | 14 + .../apache/pulsar/broker/admin/v1/Namespaces.java | 13 + .../apache/pulsar/broker/admin/v2/Namespaces.java | 12 +- .../pulsar/broker/service/AbstractTopic.java | 22 +- .../org/apache/pulsar/broker/service/Consumer.java | 3 +- .../apache/pulsar/broker/service/ServerCnx.java| 2 +- .../service/nonpersistent/NonPersistentTopic.java | 11 +- .../broker/service/persistent/PersistentTopic.java | 5 +- .../broker/service/ClusterMigrationTest.java | 455 + .../org/apache/pulsar/client/admin/Namespaces.java | 27 +- .../pulsar/common/policies/data/Policies.java | 2 + .../client/admin/internal/NamespacesImpl.java | 11 + .../org/apache/pulsar/admin/cli/CmdClusters.java | 11 +- .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 16 + 14 files changed, 579 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 8ab1f4dc860..a8f1af1d34f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2642,4 +2642,18 @@ public abstract class NamespacesBase extends AdminResource { return null; }); } + +protected void internalEnableMigration(boolean migrated) { +validateSuperUserAccess(); +try { +updatePolicies(namespaceName, policies -> { +policies.isMigrated = migrated; +return policies; +}); +log.info("Successfully updated migration on namespace {}", namespaceName); +} catch (Exception e) { +log.error("Failed to update migration on namespace {}", namespaceName, e); +throw new RestException(e); +} +} } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 234d7725113..b188750252a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1709,5 +1709,18 @@ public class Namespaces extends NamespacesBase { internalSetSchemaAutoUpdateCompatibilityStrategy(strategy); } +@POST +@Path("/{property}/{cluster}/{namespace}/migration") +@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace") +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) +public void enableMigration(@PathParam("property") String property, +@PathParam("cluster") String cluster, +@PathParam("namespace") String namespace, +boolean migrated) { +validateNamespaceName(property, cluster, namespace); +internalEnableMigration(migrated); +} + private static final Logger log = LoggerFactory.getLogger(Namespaces.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index dfa040baec5..36df0f7e31a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -2779,7 +2779,17 @@ public class Namespaces extends NamespacesBase { }); } - +@POST +@Path("/{tenant}/{namespace}/migration") +@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace") +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponse(code = 404, me
[pulsar] branch master updated: [fix][broker]Delete subscription and disconnect replicators after topic migration (#21029)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new c1b0454614b [fix][broker]Delete subscription and disconnect replicators after topic migration (#21029) c1b0454614b is described below commit c1b0454614b7903913cb0311bdcacf2118893fc9 Author: vineeth1995 AuthorDate: Tue Aug 22 09:52:30 2023 -0700 [fix][broker]Delete subscription and disconnect replicators after topic migration (#21029) Co-authored-by: Vineeth Polamreddy --- .../service/nonpersistent/NonPersistentTopic.java | 21 ++ .../broker/service/persistent/PersistentTopic.java | 47 +- .../broker/service/ClusterMigrationTest.java | 9 + 3 files changed, 66 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index c764283cb44..836e5655168 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -959,10 +959,31 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol consumer.topicMigrated(url); }); }); +return disconnectReplicators().thenCompose(__ -> checkAndUnsubscribeSubscriptions()); } return CompletableFuture.completedFuture(null); } +private CompletableFuture checkAndUnsubscribeSubscriptions() { +List> futures = new ArrayList<>(); +subscriptions.forEach((s, subscription) -> { +if (subscription.getConsumers().isEmpty()) { +futures.add(subscription.delete()); +} +}); + +return FutureUtil.waitForAll(futures); +} + +private CompletableFuture disconnectReplicators() { +List> futures = new ArrayList<>(); +ConcurrentOpenHashMap replicators = getReplicators(); +replicators.forEach((r, replicator) -> { +futures.add(replicator.disconnect()); +}); +return FutureUtil.waitForAll(futures); +} + @Override public void checkGC() { if (!isDeleteWhileInactive()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 57a4989b4d3..f5679665d46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -180,6 +180,7 @@ import org.apache.pulsar.utils.StatsOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback { // Managed ledger associated with the topic @@ -2575,25 +2576,49 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public CompletableFuture checkClusterMigration() { Optional clusterUrl = getMigratedClusterUrl(); -if (!isMigrated() && clusterUrl.isPresent()) { -return ledger.asyncMigrate().thenApply(__ -> { -subscriptions.forEach((name, sub) -> { -if (sub.isSubsciptionMigrated()) { - sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); -} -}); -return null; -}); -} else { +if (!clusterUrl.isPresent()) { return CompletableFuture.completedFuture(null); } +CompletableFuture migrated = !isMigrated() ? ledger.asyncMigrate() : +CompletableFuture.completedFuture(null); +return migrated.thenApply(__ -> { +subscriptions.forEach((name, sub) -> { +if (sub.isSubsciptionMigrated()) { + sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration); +} +}); +return null; +}).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions()); +} + +private CompletableFuture checkAndUnsubscribeSubscriptions() { +List> futures = new ArrayList<>(); +subscriptions.forEach((s, subscription) -> { +if (subscription.getNumberOfEntriesInBacklog(true) == 0 +&& subscription.getConsumers().isEmpty()) { +futures.
[pulsar] branch master updated: [fix][broker] Fix consumers are not redirected to migrated cluster (#20928)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4a9fec693b9 [fix][broker] Fix consumers are not redirected to migrated cluster (#20928) 4a9fec693b9 is described below commit 4a9fec693b90fcb76aea9a98a94de31608b56bab Author: vineeth1995 AuthorDate: Thu Aug 17 22:58:35 2023 -0700 [fix][broker] Fix consumers are not redirected to migrated cluster (#20928) --- .../org/apache/pulsar/broker/service/Consumer.java | 14 ++ .../org/apache/pulsar/broker/service/ServerCnx.java | 7 +++ .../apache/pulsar/broker/service/Subscription.java | 3 +++ .../nonpersistent/NonPersistentSubscription.java| 5 + .../service/persistent/PersistentSubscription.java | 6 ++ .../broker/service/persistent/PersistentTopic.java | 10 -- .../pulsar/broker/service/ClusterMigrationTest.java | 21 + 7 files changed, 64 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 176f033a6dc..ddca7ed5a89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -818,6 +818,20 @@ public class Consumer { } } +public boolean checkAndApplyTopicMigration() { +if (subscription.isSubsciptionMigrated()) { +Optional clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar()); +if (clusterUrl.isPresent()) { +ClusterUrl url = clusterUrl.get(); + cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(), +url.getBrokerServiceUrlTls()); +// disconnect consumer after sending migrated cluster url +disconnect(); +return true; +} +} +return false; +} /** * Checks if consumer-blocking on unAckedMessages is allowed for below conditions: * a. consumer must have Shared-subscription diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index f3b355d6332..6b1af573a08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1250,6 +1250,13 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { }); }) .thenAccept(consumer -> { +if (consumer.checkAndApplyTopicMigration()) { +log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}", +remoteAddress, consumerId, subscriptionName, topicName); +consumers.remove(consumerId, consumerFuture); +return; +} + if (consumerFuture.complete(consumer)) { log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, subscriptionName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 8ac855c1e7c..be079c2b4b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -102,6 +102,8 @@ public interface Subscription extends MessageExpirer { CompletableFuture updateSubscriptionProperties(Map subscriptionProperties); +boolean isSubsciptionMigrated(); + default void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) { // Default is no-op } @@ -130,4 +132,5 @@ public interface Subscription extends MessageExpirer { static boolean isIndividualAckMode(SubType subType) { return SubType.Shared.equals(subType) || SubType.Key_Shared.equals(subType); } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index 1048864ad64..7cd4d8984c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscrip
[pulsar] branch master updated: [fix][broker] Broker failed to load v1 namespace resources cache (#20783)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new b8e6948f62d [fix][broker] Broker failed to load v1 namespace resources cache (#20783) b8e6948f62d is described below commit b8e6948f62d6ec2ca53b6a85fe2fd07d4dee6853 Author: Rajan Dhabalia AuthorDate: Fri Jul 21 15:41:50 2023 -0700 [fix][broker] Broker failed to load v1 namespace resources cache (#20783) --- .../pulsar/broker/resources/BaseResources.java | 36 ++ .../broker/resources/NamespaceResources.java | 2 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 14 + 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index 42add4271f6..4011a482075 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -20,11 +20,16 @@ package org.apache.pulsar.broker.resources; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -78,6 +83,37 @@ public class BaseResources { return cache.getChildren(path); } +protected CompletableFuture> getChildrenRecursiveAsync(String path) { +Set children = ConcurrentHashMap.newKeySet(); +CompletableFuture> result = new CompletableFuture<>(); +getChildrenRecursiveAsync(path, children, result, new AtomicInteger(1), path); +return result; +} + +private void getChildrenRecursiveAsync(String path, Set children, CompletableFuture> result, +AtomicInteger totalResults, String parent) { +cache.getChildren(path).thenAccept(childList -> { +childList = childList != null ? childList : Collections.emptyList(); +if (totalResults.decrementAndGet() == 0 && childList.isEmpty()) { +result.complete(new ArrayList<>(children)); +return; +} +if (childList.isEmpty()) { +return; +} +// remove current node from children if current node is not leaf +children.remove(parent); +// childPrefix creates a path hierarchy if children has multi level path +String childPrefix = path.equals(parent) ? "" : parent + "/"; +totalResults.addAndGet(childList.size()); +for (String child : childList) { +children.add(childPrefix + child); +String childPath = path + "/" + child; +getChildrenRecursiveAsync(childPath, children, result, totalResults, child); +} +}); +} + protected Optional get(String path) throws MetadataStoreException { try { return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 48f82596567..e5dd13c32eb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -62,7 +62,7 @@ public class NamespaceResources extends BaseResources { } public CompletableFuture> listNamespacesAsync(String tenant) { -return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant)); +return getChildrenRecursiveAsync(joinPath(BASE_POLICIES_PATH, tenant)); } public CompletableFuture getPoliciesReadOnlyAsync() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 83ed63bf0d9..a4f6bd4650f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -31,6 +31,7 @@ import static org.mockito.Mockito.
[pulsar] branch master updated: [Fix][Doc] Update docs for Partition increase command (#20035)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1e9c7f52368 [Fix][Doc] Update docs for Partition increase command (#20035) 1e9c7f52368 is described below commit 1e9c7f5236866d07105421c566241e3798f63097 Author: Apurva007 AuthorDate: Tue Apr 11 20:07:45 2023 -0700 [Fix][Doc] Update docs for Partition increase command (#20035) Co-authored-by: Apurva Telang --- .../org/apache/pulsar/broker/admin/v1/PersistentTopics.java | 4 ++-- .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 4 ++-- .../src/main/java/org/apache/pulsar/client/admin/Topics.java | 12 ++-- .../org/apache/pulsar/admin/cli/CmdPersistentTopics.java | 2 +- .../src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 16f1b357dcd..e9bb1a40547 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -256,14 +256,14 @@ public class PersistentTopics extends PersistentTopicsBase { } /** - * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be + * It updates number of partitions of an existing partitioned topic. It requires partitioned-topic to be * already exist and number of new partitions must be greater than existing number of partitions. Decrementing * number of partitions requires deletion of topic which is not supported. */ @POST @Path("/{property}/{cluster}/{namespace}/{topic}/partitions") @ApiOperation(hidden = true, value = "Increment partitions of an existing partitioned topic.", -notes = "It only increments partitions of existing non-global partitioned-topic") +notes = "It increments partitions of existing partitioned-topic") @ApiResponses(value = { @ApiResponse(code = 204, message = "Update topic partition successful."), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 477cc8b5712..eee363aeed8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -775,14 +775,14 @@ public class PersistentTopics extends PersistentTopicsBase { } /** - * It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be + * It updates number of partitions of an existing partitioned topic. It requires partitioned-topic to be * already exist and number of new partitions must be greater than existing number of partitions. Decrementing * number of partitions requires deletion of topic which is not supported. */ @POST @Path("/{tenant}/{namespace}/{topic}/partitions") @ApiOperation(value = "Increment partitions of an existing partitioned topic.", -notes = "It only increments partitions of existing non-global partitioned-topic") +notes = "It increments partitions of existing partitioned-topic") @ApiResponses(value = { @ApiResponse(code = 204, message = "Update topic partition successful."), @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 0d5d14eb734..f599e2566bf 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -589,7 +589,7 @@ public interface Topics { CompletableFuture createMissedPartitionsAsync(String topic); /** - * Update number of partitions of a non-global partitioned topic. + * Update number of partitions of a partitioned topic. * * It requires partitioned-topic to be already exist and number of new partitions must be greater than existing * number of partitions. Decrementing number o
[pulsar] branch master updated (c3a92b213f0 -> 34b6e892696)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from c3a92b213f0 [fix] [ml] Fix uncompleted future when remove cursor (#20050) add 34b6e892696 [feat] [broker] PIP-188 support blue-green cluster migration [part-2] (#19605) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/service/Replicator.java | 2 + .../apache/pulsar/broker/service/ServerCnx.java| 24 ++- .../org/apache/pulsar/broker/service/Topic.java| 2 + .../nonpersistent/NonPersistentReplicator.java | 2 +- .../service/nonpersistent/NonPersistentTopic.java | 5 + .../service/persistent/PersistentReplicator.java | 4 +- .../broker/service/persistent/PersistentTopic.java | 21 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 8 +- .../broker/service/ClusterMigrationTest.java | 229 ++--- 9 files changed, 253 insertions(+), 44 deletions(-)
[pulsar] branch master updated: [fix][cli] Fix Pulsar admin tool is ignoring tls-trust-cert path arg (#19696)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e75def80ff2 [fix][cli] Fix Pulsar admin tool is ignoring tls-trust-cert path arg (#19696) e75def80ff2 is described below commit e75def80ff2f366919b55cfd42f1dfab3f719b88 Author: Rajan Dhabalia AuthorDate: Sat Mar 4 08:46:50 2023 -0800 [fix][cli] Fix Pulsar admin tool is ignoring tls-trust-cert path arg (#19696) --- .../admin/internal/PulsarAdminBuilderImpl.java | 2 ++ .../pulsar/admin/cli/PulsarAdminSupplier.java | 5 +++- .../apache/pulsar/admin/cli/PulsarAdminTool.java | 12 ++-- .../org/apache/pulsar/admin/cli/TestRunMain.java | 35 ++ 4 files changed, 50 insertions(+), 4 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index 3e7ee472e46..009fa67fbaa 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin.internal; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; @@ -33,6 +34,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { +@Getter protected ClientConfigurationData conf; private ClassLoader clientBuilderClassLoader = null; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java index 6aa8d2b9c61..764dc9de5df 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java @@ -53,7 +53,7 @@ public class PulsarAdminSupplier implements Supplier { } } -private final PulsarAdminBuilder adminBuilder; +protected final PulsarAdminBuilder adminBuilder; private RootParamsKey currentParamsKey; private PulsarAdmin admin; @@ -103,6 +103,9 @@ public class PulsarAdminSupplier implements Supplier { if (isNotBlank(rootParams.tlsProvider)) { adminBuilder.sslProvider(rootParams.tlsProvider); } +if (isNotBlank(rootParams.tlsTrustCertsFilePath)) { + adminBuilder.tlsTrustCertsFilePath(rootParams.tlsTrustCertsFilePath); +} } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java index 7a7a4c4b444..ca0a8a055cf 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java @@ -45,7 +45,7 @@ import org.apache.pulsar.common.util.ShutdownUtil; public class PulsarAdminTool { -private static boolean allowSystemExit = true; +protected static boolean allowSystemExit = true; private static int lastExitCode = Integer.MIN_VALUE; @@ -54,7 +54,7 @@ public class PulsarAdminTool { protected JCommander jcommander; protected RootParams rootParams; private final Properties properties; -private PulsarAdminSupplier pulsarAdminSupplier; +protected PulsarAdminSupplier pulsarAdminSupplier; @Getter public static class RootParams { @@ -277,11 +277,16 @@ public class PulsarAdminTool { } public static void main(String[] args) throws Exception { +execute(args); +} + +@VisibleForTesting +public static PulsarAdminTool execute(String[] args) throws Exception { lastExitCode = 0; if (args.length == 0) { System.out.println("Usage: pulsar-admin CONF_FILE_PATH [options] [command] [command options]"); exit(0); -return; +return null; } String configFile = args[0]; Properties properties = new Properties(); @@ -299,6 +304,7 @@ public class PulsarAdminTool { } else { exit(1); } +return tool; } private static void exit(int code) { diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java b/pulsar-client-tools/src/test/java/org/apache/pulsar
[pulsar] branch master updated: [improve][admin] Fix NPE in admin-CLI topic stats command (#18326)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 251d9fda0a1 [improve][admin] Fix NPE in admin-CLI topic stats command (#18326) 251d9fda0a1 is described below commit 251d9fda0a1c77f589a69d906a1a014ebe2d7071 Author: vineeth1995 AuthorDate: Thu Nov 3 14:42:43 2022 -0700 [improve][admin] Fix NPE in admin-CLI topic stats command (#18326) Co-authored-by: Vineeth --- .../src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 8e3d15ee406..5dbb3935ce9 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -347,7 +347,7 @@ public class CmdPersistentTopics extends CmdBase { @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); -print(persistentTopics.getStats(persistentTopic, getPreciseBacklog)); +print(getPersistentTopics().getStats(persistentTopic, getPreciseBacklog)); } }
[pulsar] branch master updated: [improve][broker] log sessionId with zk-session-watcher for debugging (#18291)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 987e0dacd31 [improve][broker] log sessionId with zk-session-watcher for debugging (#18291) 987e0dacd31 is described below commit 987e0dacd31aeaba6a2f1a59ec5c55fb59bb7999 Author: Rajan Dhabalia AuthorDate: Tue Nov 1 16:23:17 2022 -0700 [improve][broker] log sessionId with zk-session-watcher for debugging (#18291) --- .../main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java index 38d0ba32a36..a0247e22319 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -149,8 +149,8 @@ public class ZKSessionWatcher implements AutoCloseable, Watcher { currentStatus = SessionEvent.SessionLost; sessionListener.accept(currentStatus); } else if (currentStatus != SessionEvent.SessionLost) { -log.warn("ZooKeeper client is disconnected. Waiting to reconnect, time remaining = {} seconds", -timeRemainingMillis / 1000.0); +log.warn("[{}] ZooKeeper client is disconnected. Waiting to reconnect, time remaining = {} seconds", +zk.getSessionId(), timeRemainingMillis / 1000.0); if (currentStatus == SessionEvent.SessionReestablished) { currentStatus = SessionEvent.ConnectionLost; sessionListener.accept(currentStatus);
[pulsar] branch master updated: [improve] [client] Add api to get producer/consumer stats for partition topic (#18212)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 26f9ffa497e [improve] [client] Add api to get producer/consumer stats for partition topic (#18212) 26f9ffa497e is described below commit 26f9ffa497e16396ea4dbbcad3452b2a973f86ac Author: Rajan Dhabalia AuthorDate: Tue Nov 1 12:57:11 2022 -0700 [improve] [client] Add api to get producer/consumer stats for partition topic (#18212) * [improve] [client] Add api to get producer/consumer stats for partition topic * introduce partition topic stats interface --- .../client/api/SimpleProducerConsumerStatTest.java | 61 + .../pulsar/client/api/MultiTopicConsumerStats.java | 39 +++ .../client/api/PartitionedTopicProducerStats.java | 40 +++ .../apache/pulsar/client/api/ProducerStats.java| 1 - .../impl/MultiTopicConsumerStatsRecorderImpl.java | 59 .../client/impl/MultiTopicsConsumerImpl.java | 6 +- .../client/impl/PartitionedProducerImpl.java | 9 ++- .../PartitionedTopicProducerStatsRecorderImpl.java | 79 ++ .../pulsar/client/impl/ProducerStatsDisabled.java | 1 + .../client/impl/ProducerStatsRecorderImpl.java | 17 + .../client/impl/ProducerStatsRecorderImplTest.java | 4 +- 11 files changed, 292 insertions(+), 24 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index ad849395045..bae891ad46a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.api; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -30,6 +31,7 @@ import com.google.gson.JsonObject; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -449,4 +451,63 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { .until(() -> producer.getStats().getPendingQueueSize() == numMessages); assertEquals(producer.getStats().getPendingQueueSize(), numMessages); } + +/** + * This test verifies partitioned topic stats for producer and consumer. + * @throws Exception + */ +@Test +public void testPartitionTopicStats() throws Exception { +log.info("-- Starting {} test --", methodName); + +String topicName = "persistent://my-property/my-ns/testPartitionTopicStats"; +int numPartitions = 10; +admin.topics().createPartitionedTopic(topicName, numPartitions); + +ConsumerBuilder consumerBuilder = pulsarClient.newConsumer().topic(topicName) +.subscriptionName("my-subscriber-name"); + +Consumer consumer = consumerBuilder.subscribe(); + +ProducerBuilder producerBuilder = pulsarClient.newProducer().enableBatching(false).topic(topicName); + +Producer producer = producerBuilder.create(); + +int numMessages = 20; +for (int i = 0; i < numMessages; i++) { +String message = "my-message-" + i; +producer.send(message.getBytes()); +} + +Message msg = null; +Set messageSet = new HashSet<>(); +for (int i = 0; i < numMessages; i++) { +msg = consumer.receive(5, TimeUnit.SECONDS); +String receivedMessage = new String(msg.getData()); +log.info("Received message: [{}]", receivedMessage); +String expectedMessage = "my-message-" + i; +testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); +} +// Acknowledge the consumption of all messages at once +consumer.acknowledgeCumulative(msg); + +MultiTopicConsumerStats cStat = (MultiTopicConsumerStats) consumer.getStats(); +PartitionedTopicProducerStats pStat = (PartitionedTopicProducerStats) producer.getStats(); +retryStrategically((test) -> !pStat.getPartitionStats().isEmpty(), 5, 100); +retryStrategically((test) -> !cStat.getPartitionStats().isEmpty(), 5, 100); +Map prodStatsMap = pStat.getPartitionStats(); +Map consStatsMap = cStat.getPartitionStats(); +assertFalse(prodStatsMap.isE
[pulsar] branch master updated (5b452d1ce82 -> b0945d1d45d)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 5b452d1ce82 [improve][doc] Add code example for WebSocket API to use TLS with CLI tools (#15485) add b0945d1d45d [feat] [broker] PIP-188 support blue-green cluster migration [part-1] (#17962) No new revisions were added by this update. Summary of changes: conf/broker.conf | 4 + .../apache/bookkeeper/mledger/ManagedLedger.java | 7 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 33 ++- .../apache/pulsar/broker/ServiceConfiguration.java | 7 + .../pulsar/broker/admin/impl/ClustersBase.java | 62 .../broker/service/AbstractBaseDispatcher.java | 13 + .../pulsar/broker/service/AbstractTopic.java | 31 +- .../pulsar/broker/service/BrokerService.java | 11 + .../broker/service/BrokerServiceException.java | 10 + .../org/apache/pulsar/broker/service/Consumer.java | 13 + .../org/apache/pulsar/broker/service/Producer.java | 11 + .../pulsar/broker/service/PulsarCommandSender.java | 3 + .../broker/service/PulsarCommandSenderImpl.java| 13 + .../apache/pulsar/broker/service/ServerCnx.java| 19 +- .../org/apache/pulsar/broker/service/Topic.java| 2 + .../service/nonpersistent/NonPersistentTopic.java | 40 ++- .../service/persistent/CompactorSubscription.java | 4 +- .../PersistentDispatcherMultipleConsumers.java | 2 +- .../PersistentDispatcherSingleActiveConsumer.java | 2 +- ...istentStreamingDispatcherMultipleConsumers.java | 3 +- ...entStreamingDispatcherSingleActiveConsumer.java | 2 +- .../service/persistent/PersistentSubscription.java | 5 +- .../broker/service/persistent/PersistentTopic.java | 23 +- .../broker/intercept/CounterBrokerInterceptor.java | 2 +- .../broker/service/ClusterMigrationTest.java | 329 + pulsar-client-admin-api/pom.xml| 1 + .../org/apache/pulsar/client/admin/Clusters.java | 37 +++ .../pulsar/common/policies/data/ClusterData.java | 23 ++ .../pulsar/client/admin/internal/ClustersImpl.java | 14 + .../pulsar/client/api/PulsarClientException.java | 16 + .../org/apache/pulsar/admin/cli/CmdClusters.java | 24 ++ .../org/apache/pulsar/client/impl/ClientCnx.java | 23 ++ .../pulsar/client/impl/ConnectionHandler.java | 7 +- .../apache/pulsar/client/impl/HandlerState.java| 9 + pulsar-common/pom.xml | 5 + .../common/policies/data/ClusterDataImpl.java | 27 +- .../apache/pulsar/common/protocol/Commands.java| 11 + .../pulsar/common/protocol/PulsarDecoder.java | 10 + pulsar-common/src/main/proto/PulsarApi.proto | 17 ++ site2/docs/reference-pulsar-admin.md | 18 ++ .../offload/jcloud/impl/MockManagedLedger.java | 12 + 41 files changed, 886 insertions(+), 19 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java
[pulsar] branch master updated: [improve][pulsar-testclient] Add proxyServiceUrl and proxyProtocol as options for PerfTool CLI (#17862)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d1b7c6af37f [improve][pulsar-testclient] Add proxyServiceUrl and proxyProtocol as options for PerfTool CLI (#17862) d1b7c6af37f is described below commit d1b7c6af37fab50ac916e9e5b61c4345ad3643ad Author: vineeth1995 AuthorDate: Mon Oct 3 18:09:46 2022 -0700 [improve][pulsar-testclient] Add proxyServiceUrl and proxyProtocol as options for PerfTool CLI (#17862) * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI * Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI Co-authored-by: Vineeth --- conf/client.conf | 4 + .../apache/pulsar/testclient/PerfClientUtils.java | 3 +- .../testclient/PerformanceBaseArguments.java | 27 + .../pulsar/testclient/PerfClientUtilsTest.java | 5 + .../testclient/PerformanceBaseArgumentsTest.java | 113 + .../src/test/resources/perf_client1.conf | 2 + 6 files changed, 153 insertions(+), 1 deletion(-) diff --git a/conf/client.conf b/conf/client.conf index ea1d339a09c..8a485e5676c 100644 --- a/conf/client.conf +++ b/conf/client.conf @@ -88,7 +88,11 @@ tlsKeyStorePassword= # When TLS authentication with KeyStore is used, available options can be SunJSSE, Conscrypt and so on. webserviceTlsProvider= +#Proxy-server URL to which to connect +proxyServiceUrl= +#Proxy protocol to select type of routing at proxy +proxyProtocol= # Pulsar Admin Custom Commands #customCommandFactoriesDirectory=commandFactories diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java index 1ce5777fd32..a3552d31430 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java @@ -75,7 +75,8 @@ public class PerfClientUtils { .enableBusyWait(arguments.enableBusyWait) .listenerThreads(arguments.listenerThreads) .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath) -.maxLookupRequests(arguments.maxLookupRequest); +.maxLookupRequests(arguments.maxLookupRequest) +.proxyServiceUrl(arguments.proxyServiceURL, arguments.proxyProtocol); if (isNotBlank(arguments.authPluginClassName)) { clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java index cff7e16e9ca..307af7cbb15 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java @@ -19,10 +19,12 @@ package org.apache.pulsar.testclient; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.pulsar.testclient.PerfClientUtils.exit; import com.beust.jcommander.Parameter; import java.io.FileInputStream; import java.util.Properties; import lombok.SneakyThrows; +import org.apache.pulsar.client.api.ProxyProtocol; public abstract class PerformanceBaseArguments { @@ -85,6 +87,12 @@ public abstract class PerformanceBaseArguments { + "on each broker connection to prevent overloading a broker") public int maxLookupRequest = 5; +@Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.") +String proxyServiceURL = null; + +@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.") +ProxyProtocol proxyProtocol = null; + public abstract void fillArgumentsFromProperties(Properties prop); @SneakyThrows @@ -133,6 +141,25 @@ public abstract class PerformanceBaseArguments { .getProperty("tlsEnableHostnameVerification", "")); } + +if (proxyServiceURL == null) { +proxyServiceURL = prop.g
[pulsar-manager] branch master updated: Redirect notifications to the commits@ list instead of dev (#490)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git The following commit(s) were added to refs/heads/master by this push: new 519b476 Redirect notifications to the commits@ list instead of dev (#490) 519b476 is described below commit 519b47699872e4034618d3974af77619703e8d3a Author: Matteo Merli AuthorDate: Wed Sep 28 12:21:41 2022 -0700 Redirect notifications to the commits@ list instead of dev (#490) --- .asf.yaml | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.asf.yaml b/.asf.yaml index 209aec5..5c8b9e9 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -45,4 +45,11 @@ github: # disable merge button: merge: false # disable rebase button: -rebase: false \ No newline at end of file +rebase: false + +notifications: + commits: commits@pulsar.apache.org + issues: commits@pulsar.apache.org + pullrequests: commits@pulsar.apache.org + discussions: d...@pulsar.apache.org + jira_options: link label
[pulsar] branch master updated (518cdcd9c2c -> 6528a914350)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from 518cdcd9c2c [fix][metrics]wrong metrics text generated when label_cluster specified (#17704) add 6528a914350 [Pulsar-init] Support cluster init using proxy url and protocol (#17844) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/PulsarClusterMetadataSetup.java | 13 + 1 file changed, 13 insertions(+)
[pulsar-manager] branch master updated: Removing transitive dependencies for log4j (#488)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git The following commit(s) were added to refs/heads/master by this push: new 3b2dd2f Removing transitive dependencies for log4j (#488) 3b2dd2f is described below commit 3b2dd2fa20008b0c15992cac2946ef59daed08f6 Author: gurleen-gks AuthorDate: Thu Sep 15 10:00:56 2022 -0700 Removing transitive dependencies for log4j (#488) Co-authored-by: Gurleen Kaur --- build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/build.gradle b/build.gradle index 84a5410..e442883 100644 --- a/build.gradle +++ b/build.gradle @@ -42,6 +42,7 @@ configurations { testPlugins {} all*.exclude group: 'org.bouncycastle', module: 'bc-fips' all*.exclude group: 'io.netty', module: 'netty-tcnative-classes' +all*.exclude group: 'log4j', module: 'log4j' } dependencies {
[pulsar] branch master updated (f453e0a5b7a -> 5fce0ce9596)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git from f453e0a5b7a remove unnecessary parameters(reusefuture) and related logic (#17378) add 5fce0ce9596 [improve][pulsar-testclient] Add disableBatching as CLI argument for pulsar-perf-producer (#17381) No new revisions were added by this update. Summary of changes: .../pulsar/testclient/PerformanceProducer.java | 82 +- .../pulsar/testclient/PerformanceProducerTest.java | 26 +++ 2 files changed, 73 insertions(+), 35 deletions(-)
[pulsar-manager] branch master updated: Removing conflicting dependencies (#482)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git The following commit(s) were added to refs/heads/master by this push: new 765c510 Removing conflicting dependencies (#482) 765c510 is described below commit 765c5109e652558cbeb75ddcd564051bc791 Author: gurleen-gks AuthorDate: Mon Aug 15 15:56:13 2022 -0700 Removing conflicting dependencies (#482) Co-authored-by: Gurleen Kaur --- build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle b/build.gradle index 723a6fb..84a5410 100644 --- a/build.gradle +++ b/build.gradle @@ -40,6 +40,8 @@ repositories { configurations { testPlugins {} +all*.exclude group: 'org.bouncycastle', module: 'bc-fips' +all*.exclude group: 'io.netty', module: 'netty-tcnative-classes' } dependencies {
[pulsar] branch master updated: merge conflicts (#16961)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7a65f3ff907 merge conflicts (#16961) 7a65f3ff907 is described below commit 7a65f3ff907385af4b115f300324e5ec60aa25ac Author: vineeth1995 AuthorDate: Fri Aug 5 14:32:27 2022 -0700 merge conflicts (#16961) Co-authored-by: Vineeth --- .../pulsar/client/cli/PulsarClientToolTest.java| 26 ++ .../apache/pulsar/client/cli/PulsarClientTool.java | 11 + 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index 4a1589250ad..5cc5e50b96d 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import com.beust.jcommander.JCommander; import lombok.Cleanup; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -269,6 +270,31 @@ public class PulsarClientToolTest extends BrokerTestBase { } } +@Test(timeOut = 2) +public void testArgs() throws Exception { +PulsarClientTool pulsarClientTool = new PulsarClientTool(new Properties()); +final String url = "pulsar+ssl://localhost:6651"; +final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationTls"; +final String authParams = "tlsCertFile:pulsar-broker/src/test/resources/authentication/tls/client-cert.pem," + + "tlsKeyFile:pulsar-broker/src/test/resources/authentication/tls/client-key.pem"; +final String tlsTrustCertsFilePath = "pulsar/pulsar-broker/src/test/resources/authentication/tls/cacert.pem"; +final String message = "test msg"; +final int numberOfMessages = 1; +final String topicName = getTopicWithRandomSuffix("test-topic"); + +String[] args = {"--url", url, +"--auth-plugin", authPlugin, +"--auth-params", authParams, +"--tlsTrustCertsFilePath", tlsTrustCertsFilePath, +"produce", "-m", message, +"-n", Integer.toString(numberOfMessages), topicName}; +pulsarClientTool.jcommander.parse(args); +assertEquals(pulsarClientTool.rootParams.getTlsTrustCertsFilePath(), tlsTrustCertsFilePath); +assertEquals(pulsarClientTool.rootParams.getAuthParams(), authParams); +assertEquals(pulsarClientTool.rootParams.getAuthPluginClassName(), authPlugin); +assertEquals(pulsarClientTool.rootParams.getServiceURL(), url); +} + private static String getTopicWithRandomSuffix(String localNameBase) { return String.format("persistent://prop/ns-abc/test/%s-%s", localNameBase, UUID.randomUUID().toString()); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java index 32770f3bd07..648505b7d75 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java @@ -72,15 +72,19 @@ public class PulsarClientTool { @Parameter(names = { "-h", "--help", }, help = true, description = "Show this help.") boolean help; + +@Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File path to client trust certificates") +String tlsTrustCertsFilePath; } protected RootParams rootParams; boolean tlsAllowInsecureConnection; boolean tlsEnableHostnameVerification; -String tlsTrustCertsFilePath; + String tlsKeyFilePath; String tlsCertificateFilePath; + // for tls with keystore type config boolean useKeyStoreTls; String tlsTrustStoreType; @@ -103,8 +107,6 @@ public class PulsarClientTool { .parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false")); this.tlsEnableHostnameVerification = Boolean .parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false")); -this.tlsTrustCertsFilePath = properties.getPro
[pulsar] branch master updated: [PIP-136] Sync Pulsar metadata across multiple clouds (#16425)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a38f74dfefe [PIP-136] Sync Pulsar metadata across multiple clouds (#16425) a38f74dfefe is described below commit a38f74dfefec8620a16b384da428c6c4fedbdd70 Author: Rajan Dhabalia AuthorDate: Sun Jul 31 16:39:27 2022 -0700 [PIP-136] Sync Pulsar metadata across multiple clouds (#16425) * [PIP-136] Sync Pulsar policies across multiple clouds add unit test add findbug * address comments --- conf/broker.conf | 6 + .../apache/pulsar/broker/ServiceConfiguration.java | 17 ++ .../org/apache/pulsar/broker/PulsarService.java| 35 +++- .../service/PulsarMetadataEventSynchronizer.java | 199 + .../broker/auth/MockedPulsarServiceBaseTest.java | 30 +++- .../OwnerShipForCurrentServerTestBase.java | 4 +- .../broker/transaction/TransactionTestBase.java| 4 +- .../apache/pulsar/broker/web/WebServiceTest.java | 4 +- pulsar-metadata/pom.xml| 1 + .../apache/pulsar/metadata/api/MetadataEvent.java | 57 ++ .../metadata/api/MetadataEventSynchronizer.java| 53 ++ .../pulsar/metadata/api/MetadataStoreConfig.java | 6 + .../api/extended/MetadataStoreExtended.java| 10 ++ .../metadata/impl/AbstractMetadataStore.java | 137 -- .../metadata/impl/LocalMemoryMetadataStore.java| 10 ++ .../pulsar/metadata/impl/RocksdbMetadataStore.java | 10 ++ .../pulsar/metadata/impl/ZKMetadataStore.java | 8 +- .../batching/AbstractBatchedMetadataStore.java | 11 ++ .../src/main/resources/findbugsExclude.xml | 34 .../impl/LocalMemoryMetadataStoreTest.java | 196 ...est.java => MetadataEventSynchronizerTest.java} | 2 +- site2/docs/reference-configuration-broker.md | 19 ++ site2/docs/reference-configuration.md | 2 +- 23 files changed, 795 insertions(+), 60 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 7a0e32c95b3..5216034fdb8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -29,6 +29,12 @@ metadataStoreUrl= # Configuration file path for metadata store. It's supported by RocksdbMetadataStore and EtcdMetadataStore for now metadataStoreConfigPath= +# Event topic to sync metadata between separate pulsar clusters on different cloud platforms. +metadataSyncEventTopic= + +# Event topic to sync configuration-metadata between separate pulsar clusters on different cloud platforms. +configurationMetadataSyncEventTopic= + # The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl configurationMetadataStoreUrl= diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4fb680cf8b4..93c79e5398e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -483,6 +483,23 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String metadataStoreConfigPath = null; + +@FieldContext( +dynamic = true, +category = CATEGORY_SERVER, +doc = "Event topic to sync metadata between separate pulsar " ++ "clusters on different cloud platforms." +) +private String metadataSyncEventTopic = null; + +@FieldContext( +dynamic = true, +category = CATEGORY_SERVER, +doc = "Event topic to sync configuration-metadata between separate pulsar " ++ "clusters on different cloud platforms." +) +private String configurationMetadataSyncEventTopic = null; + @FieldContext( dynamic = true, doc = "Factory class-name to create topic with custom workflow" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 5c08eb1fc08..f0d9e68f380 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -101,6 +101,7 @@ import org.apache.pulsar.broker.resources.ClusterResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.rest.Topics; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer; import org.apache.pulsar.broker.ser
[pulsar] branch master updated: Support encryption in websocket proxy (#16234)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 52da03f08f4 Support encryption in websocket proxy (#16234) 52da03f08f4 is described below commit 52da03f08f44aa841ba5510ecb83b8834ba8964c Author: Rajan Dhabalia AuthorDate: Wed Jul 20 17:49:32 2022 -0700 Support encryption in websocket proxy (#16234) fix doc --- .../configuration/PulsarConfigurationLoader.java | 16 +- .../proxy/ProxyEncryptionPublishConsumeTest.java | 249 + .../apache/pulsar/websocket/ConsumerHandler.java | 3 + .../pulsar/websocket/CryptoKeyReaderFactory.java | 29 +++ .../apache/pulsar/websocket/ProducerHandler.java | 10 + .../org/apache/pulsar/websocket/ReaderHandler.java | 3 + .../apache/pulsar/websocket/WebSocketService.java | 19 ++ .../service/WebSocketProxyConfiguration.java | 5 + site2/docs/client-libraries-websocket.md | 7 + 9 files changed, 338 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java index 12bba1a7fa2..b0479d81588 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java @@ -182,23 +182,33 @@ public class PulsarConfigurationLoader { final ServiceConfiguration convertedConf = ServiceConfiguration.class .getDeclaredConstructor().newInstance(); Field[] confFields = conf.getClass().getDeclaredFields(); +Properties properties = new Properties(); Arrays.stream(confFields).forEach(confField -> { try { -Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName()); confField.setAccessible(true); +Field convertedConfField = ServiceConfiguration.class.getDeclaredField(confField.getName()); if (!Modifier.isStatic(convertedConfField.getModifiers())) { convertedConfField.setAccessible(true); convertedConfField.set(convertedConf, confField.get(conf)); } } catch (NoSuchFieldException e) { if (!ignoreNonExistMember) { -throw new IllegalArgumentException("Exception caused while converting configuration: " -+ e.getMessage()); +throw new IllegalArgumentException( +"Exception caused while converting configuration: " + e.getMessage()); +} +// add unknown fields to properties +try { +if (confField.get(conf) != null) { +properties.put(confField.getName(), confField.get(conf)); +} +} catch (Exception ignoreException) { +// should not happen } } catch (IllegalAccessException e) { throw new RuntimeException("Exception caused while converting configuration: " + e.getMessage()); } }); +convertedConf.getProperties().putAll(properties); return convertedConf; } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java new file mode 100644 index 000..bbd2b3bd14f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WIT
[pulsar] branch master updated: [pulsar-broker] PIP-100 Support pluggable topic factory (#12235)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6f38c5aa5b9 [pulsar-broker] PIP-100 Support pluggable topic factory (#12235) 6f38c5aa5b9 is described below commit 6f38c5aa5b9838c3a2a5fea10e13dd3830947d9f Author: Rajan Dhabalia AuthorDate: Wed Jul 20 13:01:44 2022 -0700 [pulsar-broker] PIP-100 Support pluggable topic factory (#12235) add Closeable fix test close factory and handle exception --- conf/broker.conf | 3 + .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++ .../pulsar/broker/service/BrokerService.java | 43 - .../apache/pulsar/broker/service/TopicFactory.java | 31 ++ .../broker/service/PersistentTopicE2ETest.java | 71 +++--- .../common/naming/ServiceConfigurationTest.java| 1 - site2/docs/reference-configuration.md | 2 + 7 files changed, 146 insertions(+), 11 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index d2e13d117e8..c19afe981ca 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -136,6 +136,9 @@ brokerShutdownTimeoutMs=6 # Flag to skip broker shutdown when broker handles Out of memory error skipBrokerShutdownOnOOM=false +# Factory class-name to create topic with custom workflow +topicFactoryClassName= + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d2e68fb437c..ad4188d2882 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -479,6 +479,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private String metadataStoreConfigPath = null; +@FieldContext( +dynamic = true, +doc = "Factory class-name to create topic with custom workflow" +) +private String topicFactoryClassName; + @FieldContext( category = CATEGORY_POLICIES, doc = "Enable backlog quota check. Enforces actions on topic when the quota is reached" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e80d603a141..8e701ebf5bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -276,6 +276,7 @@ public class BrokerService implements Closeable { private final LongAdder pausedConnections = new LongAdder(); private BrokerInterceptor interceptor; private ImmutableMap entryFilters; +private TopicFactory topicFactory; private Set brokerEntryMetadataInterceptors; private Set brokerEntryPayloadProcessors; @@ -346,6 +347,7 @@ public class BrokerService implements Closeable { this.authenticationService = new AuthenticationService(pulsar.getConfiguration()); this.blockedDispatchers = ConcurrentOpenHashSet.newBuilder().build(); +this.topicFactory = createPersistentTopicFactory(); // update dynamic configuration and register-listener updateConfigurationAndRegisterListeners(); this.lookupRequestSemaphore = new AtomicReference( @@ -1127,7 +1129,13 @@ public class BrokerService implements Closeable { new NotAllowedException("Broker is not unable to load non-persistent topic")); } final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); -NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); +NonPersistentTopic nonPersistentTopic; +try { +nonPersistentTopic = newTopic(topic, null, this, NonPersistentTopic.class); +} catch (Exception e) { +log.warn("Failed to create topic {}", topic, e); +return FutureUtil.failedFuture(e); +} CompletableFuture isOwner = checkTopicNsOwnership(topic); isOwner.thenRun(() -> { nonPersistentTopic.initialize() @@ -1412,7 +1420,7 @@ public class BrokerService implements Closeable { try { PersistentTopic persistentTopic = isSystemTopic(topic) ? new SystemTopic(topic, ledger, BrokerService.this) -: new Persist
[pulsar] branch master updated: [pulsar-broker] Support caching to drain backlog consumers (#12258)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 586979152bf [pulsar-broker] Support caching to drain backlog consumers (#12258) 586979152bf is described below commit 586979152bf22de5a1dff3e955dfcdf540257a08 Author: Rajan Dhabalia AuthorDate: Wed Jul 20 09:48:41 2022 -0700 [pulsar-broker] Support caching to drain backlog consumers (#12258) --- conf/standalone.conf | 10 +++ .../apache/bookkeeper/mledger/ManagedLedger.java | 5 ++ .../bookkeeper/mledger/ManagedLedgerConfig.java| 59 ++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 9 +++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 59 -- .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 7 +- .../bookkeeper/mledger/impl/cache/EntryCache.java | 6 +- .../mledger/impl/cache/RangeEntryCacheImpl.java| 16 ++-- .../apache/bookkeeper/mledger/util/RangeCache.java | 4 + .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- .../apache/pulsar/broker/ServiceConfiguration.java | 24 +- .../pulsar/broker/service/BrokerService.java | 6 ++ .../apache/pulsar/broker/service/PulsarStats.java | 3 +- .../org/apache/pulsar/broker/service/Topic.java| 2 + .../service/nonpersistent/NonPersistentTopic.java | 5 ++ .../broker/service/persistent/PersistentTopic.java | 9 +++ .../client/api/MessageDispatchThrottlingTest.java | 92 ++ .../client/api/SimpleProducerConsumerTest.java | 2 +- .../offload/jcloud/impl/MockManagedLedger.java | 5 ++ 19 files changed, 306 insertions(+), 19 deletions(-) diff --git a/conf/standalone.conf b/conf/standalone.conf index f6bf8fe91c5..b3281c02737 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -752,6 +752,16 @@ managedLedgerAddEntryTimeoutSeconds=0 # Of course, use a smaller value may degrade consumption throughput. Default is 10ms. managedLedgerNewEntriesCheckDelayInMillis=10 +# Minimum cursors that must be in backlog state to cache and reuse the read entries. +# (Default =0 to disable backlog reach cache) +managedLedgerMinimumBacklogCursorsForCaching=0 + +# Minimum backlog entries for any cursor before start caching reads. +managedLedgerMinimumBacklogEntriesForCaching=100 + +# Maximum backlog entry difference to prevent caching entries that can't be reused. +managedLedgerMaxBacklogBetweenCursorsForCaching=1 + # Use Open Range-Set to cache unacked messages managedLedgerUnackedRangesOpenCacheSetEnabled=true diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 7196a3b4c03..0ebbd514a52 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -667,4 +667,9 @@ public interface ManagedLedger { * roll over that ledger if inactive. */ void checkInactiveLedgerAndRollOver(); + +/** + * Check if managed ledger should cache backlog reads. + */ +void checkCursorsToCacheEntries(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index e628a253563..788732e763a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -81,6 +81,9 @@ public class ManagedLedgerConfig { @Getter @Setter private boolean cacheEvictionByMarkDeletedPosition = false; +private int minimumBacklogCursorsForCaching = 0; +private int minimumBacklogEntriesForCaching = 1000; +private int maxBacklogBetweenCursorsForCaching = 1000; public boolean isCreateIfMissing() { return createIfMissing; @@ -683,4 +686,60 @@ public class ManagedLedgerConfig { this.inactiveLedgerRollOverTimeMs = (int) unit.toMillis(inactiveLedgerRollOverTimeMs); } +/** + * Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other cursors' + * backlog reads. (Default = 0, broker will not cache backlog reads) + * + * @return + */ +public int getMinimumBacklogCursorsForCaching() { +return minimumBacklogCursorsForCaching; +} + +/** + * Set Minimum cursors with backlog after which broker is allowed to cache read entries to reuse them for other + * cursors' backlog reads. + * + * @param minimumBacklogCursorsForCaching + */ +public void setMinimumBacklogCursorsForCaching(int minimumBacklogCursorsForCaching
[pulsar] branch master updated: Support mechanism to provide external zookeeper-server list to build global/configuration zookeeper (#15987)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new e976bec05fb Support mechanism to provide external zookeeper-server list to build global/configuration zookeeper (#15987) e976bec05fb is described below commit e976bec05fb6be9d51115bce52972f733bb0c10a Author: Rajan Dhabalia AuthorDate: Tue Jun 21 15:59:05 2022 -0700 Support mechanism to provide external zookeeper-server list to build global/configuration zookeeper (#15987) --- docker/pulsar/scripts/generate-zookeeper-config.sh | 20 ++-- 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/docker/pulsar/scripts/generate-zookeeper-config.sh b/docker/pulsar/scripts/generate-zookeeper-config.sh index 53d7600f80e..2852742ef26 100755 --- a/docker/pulsar/scripts/generate-zookeeper-config.sh +++ b/docker/pulsar/scripts/generate-zookeeper-config.sh @@ -34,14 +34,22 @@ DOMAIN=`hostname -d` IDX=1 for SERVER in $(echo $ZOOKEEPER_SERVERS | tr "," "\n") do -echo "server.$IDX=$SERVER.$DOMAIN:2888:3888" >> $CONF_FILE - -if [ "$HOSTNAME" == "$SERVER" ]; then -MY_ID=$IDX -echo "Current server id $MY_ID" +if [[ ! -z "$EXTERNAL_PROVIDED_SERVERS" && $(fgrep -ix $EXTERNAL_PROVIDED_SERVERS <<< "true") ]]; then + echo "server.$IDX=$SERVER" >> $CONF_FILE + # external zk-server connect string starts with hostname + if [[ "$SERVER" == "$HOSTNAME"* ]]; then + MY_ID=$IDX + echo "Current server id $MY_ID" + fi +else + echo "server.$IDX=$SERVER.$DOMAIN:2888:3888" >> $CONF_FILE + if [ "$HOSTNAME" == "$SERVER" ]; then + MY_ID=$IDX + echo "Current server id $MY_ID" + fi fi - ((IDX++)) +((IDX++)) done # For ZooKeeper container we need to initialize the ZK id
[pulsar] branch master updated: add doc for how to configure chunking for readers (#15211)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3823ddf89cc add doc for how to configure chunking for readers (#15211) 3823ddf89cc is described below commit 3823ddf89cc22f14f5f994ea030bd365d58d28b3 Author: momo-jun <60642177+momo-...@users.noreply.github.com> AuthorDate: Sun Apr 24 11:12:13 2022 +0800 add doc for how to configure chunking for readers (#15211) --- site2/docs/client-libraries-java.md | 16 1 file changed, 16 insertions(+) diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md index e640b33216a..13205461f74 100644 --- a/site2/docs/client-libraries-java.md +++ b/site2/docs/client-libraries-java.md @@ -1123,6 +1123,22 @@ pulsarClient.newReader() Total hash range size is 65536, so the max end of the range should be less than or equal to 65535. +### Configure chunking + +Configuring chuncking for readers is similar to that for consumers. See [configure chunking for consumers](#configure-chunking) for more information. + +The following is an example of how to configure message chunking for a reader. + +```java +Reader reader = pulsarClient.newReader() +.topic(topicName) +.startMessageId(MessageId.earliest) +.maxPendingChunkedMessage(12) +.autoAckOldestChunkedMessageOnQueueFull(true) +.expireTimeOfIncompleteChunkedMessage(12, TimeUnit.MILLISECONDS) +.create(); +``` + ## TableView The TableView interface serves an encapsulated access pattern, providing a continuously updated key-value map view of the compacted topic data. Messages without keys will be ignored.
[pulsar] branch master updated: [pulsar-client] Add message chunking configuration for reader (#15143)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d8923b8b20f [pulsar-client] Add message chunking configuration for reader (#15143) d8923b8b20f is described below commit d8923b8b20f4d041ec07eed55aa08099447c6a2b Author: Rajan Dhabalia AuthorDate: Thu Apr 14 21:12:36 2022 -0700 [pulsar-client] Add message chunking configuration for reader (#15143) --- .../pulsar/client/impl/MessageChunkingTest.java| 36 - .../apache/pulsar/client/api/ReaderBuilder.java| 47 ++ .../pulsar/client/impl/MultiTopicsReaderImpl.java | 7 .../pulsar/client/impl/ReaderBuilderImpl.java | 18 + .../org/apache/pulsar/client/impl/ReaderImpl.java | 7 .../client/impl/conf/ReaderConfigurationData.java | 8 6 files changed, 122 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 4d752029c0f..85d67c3de0d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.MessageImpl.SchemaState; import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg; @@ -217,6 +218,8 @@ public class MessageChunkingTest extends ProducerConsumerBase { .isAckReceiptEnabled(ackReceiptEnabled) .ackTimeout(5, TimeUnit.SECONDS).subscribe(); +Reader reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); + ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topicName); Producer producer = producerBuilder.enableChunking(true).enableBatching(false).create(); @@ -232,6 +235,15 @@ public class MessageChunkingTest extends ProducerConsumerBase { Message msg = null; Set messageSet = Sets.newHashSet(); +for (int i = 0; i < totalMessages; i++) { +msg = reader.readNext(5, TimeUnit.SECONDS); +String receivedMessage = new String(msg.getData()); +log.info("Received message: [{}]", receivedMessage); +String expectedMessage = publishedMessages.get(i); +testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); +} + +messageSet.clear(); for (int i = 0; i < totalMessages; i++) { msg = consumer.receive(5, TimeUnit.SECONDS); String receivedMessage = new String(msg.getData()); @@ -268,6 +280,7 @@ public class MessageChunkingTest extends ProducerConsumerBase { consumer.close(); producer.close(); +reader.close(); log.info("-- Exiting {} test --", methodName); } @@ -384,6 +397,8 @@ public class MessageChunkingTest extends ProducerConsumerBase { producer.cnx().registerProducer(producerId, producer); // registered spy ProducerImpl ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-sub").subscribe(); +ReaderImpl reader = (ReaderImpl) pulsarClient.newReader().topic(topicName) +.startMessageId(MessageId.earliest).create(); TypedMessageBuilderImpl msg = (TypedMessageBuilderImpl) producer.newMessage().value("message-1".getBytes()); ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent()); @@ -397,17 +412,22 @@ public class MessageChunkingTest extends ProducerConsumerBase { producer.processOpSendMsg(op); retryStrategically((test) -> { -return consumer.chunkedMessagesMap.size() > 0; +return reader.getConsumer().chunkedMessagesMap.size() > 0 && consumer.chunkedMessagesMap.size() > 0; }, 5, 500); assertEquals(consumer.chunkedMessagesMap.size(), 1); +assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 1); consumer.expireTimeOfIncompleteChunkedMessageMillis = 1; +reader.getConsumer().expireTimeOfIncompleteChunkedMessageMillis = 1; Thread.sleep(10); consumer.removeExpireIncompleteChunkedMessages(); +reader.getConsumer().removeExpireIncompleteChunkedMessages(); assertEquals(consumer.chunk
[pulsar-adapters] branch master updated: Auto update partition for pulsar kafka producer (#28)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git The following commit(s) were added to refs/heads/master by this push: new 0adca97 Auto update partition for pulsar kafka producer (#28) 0adca97 is described below commit 0adca97f98025ce60ea7504431711b01a9b080cd Author: Rajan Dhabalia AuthorDate: Thu Mar 31 10:19:19 2022 -0700 Auto update partition for pulsar kafka producer (#28) --- .../clients/producer/PulsarKafkaProducer.java | 75 +- .../kafka/compat/PulsarProducerKafkaConfig.java| 1 + .../clients/producer/PulsarKafkaProducerTest.java | 72 +++-- 3 files changed, 126 insertions(+), 22 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index fca5da8..80ca9fd 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -30,7 +30,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -44,26 +46,31 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.Serializer; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.TypedMessageBuilder; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; +import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter; +import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper; import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema; import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; -import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter; -import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper; import org.apache.pulsar.client.util.MessageIdUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + +import lombok.Getter; + public class PulsarKafkaProducer implements Producer { +private static final Logger log = LoggerFactory.getLogger(PulsarKafkaProducer.class); private final PulsarClient client; final ProducerBuilder pulsarProducerBuilder; @@ -74,6 +81,9 @@ public class PulsarKafkaProducer implements Producer { private final Partitioner partitioner; private volatile Cluster cluster = Cluster.empty(); +@Getter +private final int autoUpdatePartitionDurationMs; +private final ScheduledExecutorService executor; private List> interceptors; @@ -176,6 +186,10 @@ public class PulsarKafkaProducer implements Producer { interceptors = (List) producerConfig.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); + +autoUpdatePartitionDurationMs = Integer.parseInt( + properties.getProperty(PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS_REFRESH_DURATION, "30")); +executor = Executors.newSingleThreadScheduledExecutor(); } @Override @@ -272,14 +286,19 @@ public class PulsarKafkaProducer implements Producer { public void close() { close(Long.MAX_VALUE, TimeUnit.MILLISECONDS); partitioner.close(); +if (executor != null) { +executor.shutdown(); +} } @Override public void close(long timeout, TimeUnit unit) { -try { -client.closeAsync().get(timeout, unit); -} catch (InterruptedException | ExecutionException | TimeoutException e) { -
[pulsar] branch master updated (7c1f17a -> 0a91196)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 7c1f17a Fix potential npe bug of #14384 (#14595) add 0a91196 [pulsar-broker] support client configurable message chunk size (#14382) No new revisions were added by this update. Summary of changes: .../apache/pulsar/client/impl/MessageChunkingTest.java | 18 +++--- .../org/apache/pulsar/client/api/ProducerBuilder.java | 10 ++ .../apache/pulsar/client/impl/ProducerBuilderImpl.java | 6 ++ .../org/apache/pulsar/client/impl/ProducerImpl.java| 8 +++- .../client/impl/conf/ProducerConfigurationData.java| 1 + site2/docs/client-libraries-java.md| 2 ++ 6 files changed, 41 insertions(+), 4 deletions(-)
[pulsar] branch master updated (e3c9684 -> df9a12d)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from e3c9684 [Doc] Move schema compatibility strategy cmd from topics to topicsPolicies (#14232) add df9a12d [pulsar-broker] fix ack-hole and backlog for persistent-replicator (#14282) No new revisions were added by this update. Summary of changes: .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 7 ++- .../pulsar/broker/service/AbstractReplicator.java | 4 ++ .../service/persistent/PersistentReplicator.java | 14 + .../pulsar/broker/service/ReplicatorTest.java | 59 ++ 4 files changed, 83 insertions(+), 1 deletion(-)
[pulsar] branch master updated (792e264 -> edf4858)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 792e264 [pulsar-broker] Support roll-over ledgers for inactive topics (#13073) add edf4858 [pulsar-broker] support to get list of topics under a namespace bundle (#12632) No new revisions were added by this update. Summary of changes: .../broker/admin/impl/PersistentTopicsBase.java| 24 +--- .../broker/admin/v1/NonPersistentTopics.java | 7 ++- .../pulsar/broker/admin/v1/PersistentTopics.java | 7 ++- .../broker/admin/v2/NonPersistentTopics.java | 8 ++- .../pulsar/broker/admin/v2/PersistentTopics.java | 7 ++- .../common/naming/NamespaceBundleFactory.java | 7 ++- .../apache/pulsar/broker/admin/AdminApi2Test.java | 35 +++ .../org/apache/pulsar/broker/admin/AdminTest.java | 2 +- .../org/apache/pulsar/client/admin/Topics.java | 69 ++ .../pulsar/client/admin/internal/TopicsImpl.java | 22 ++- .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +- .../org/apache/pulsar/admin/cli/CmdTopics.java | 10 +++- .../org/apache/pulsar/admin/cli/TestCmdTopics.java | 5 ++ 13 files changed, 187 insertions(+), 20 deletions(-)
[pulsar] branch master updated: [pulsar-broker] load-balancer support disabling max-session for bundle split (#13108)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f7abada [pulsar-broker] load-balancer support disabling max-session for bundle split (#13108) f7abada is described below commit f7abada1a5e1e79eee10376940187a6f7dc3cd79 Author: Rajan Dhabalia AuthorDate: Mon Dec 20 11:25:40 2021 -0800 [pulsar-broker] load-balancer support disabling max-session for bundle split (#13108) --- conf/broker.conf | 1 + .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 1 + .../apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java| 3 ++- .../apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java | 3 ++- .../java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java | 5 + 5 files changed, 11 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 80eb86e..9b67442 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1074,6 +1074,7 @@ loadBalancerAutoUnloadSplitBundlesEnabled=true loadBalancerNamespaceBundleMaxTopics=1000 # maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered +# (disable threshold check with value -1) loadBalancerNamespaceBundleMaxSessions=1000 # maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 21c82b6..ba1db65 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1953,6 +1953,7 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_LOAD_BALANCER, doc = "maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered" ++ "(disable threshold check with value -1)" ) private int loadBalancerNamespaceBundleMaxSessions = 1000; @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java index fa48618..751203c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java @@ -84,7 +84,8 @@ public class BundleSplitterTask implements BundleSplitStrategy { totalMessageRate = longTermData.totalMsgRate(); totalMessageThroughput = longTermData.totalMsgThroughput(); } -if (stats.topics > maxBundleTopics || stats.consumerCount + stats.producerCount > maxBundleSessions +if (stats.topics > maxBundleTopics || (maxBundleSessions > 0 && (stats.consumerCount ++ stats.producerCount > maxBundleSessions)) || totalMessageRate > maxBundleMsgRate || totalMessageThroughput > maxBundleBandwidth) { final String namespace = LoadManagerShared.getNamespaceNameFromBundleName(bundle); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index e1829e6..017a040 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -1372,7 +1372,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer maxBundleTopics || totalSessions > maxBundleSessions || totalMsgRate > maxBundleMsgRate +if (stats.topics > maxBundleTopics || (maxBundleSessions > 0 +&& totalSessions > maxBundleSessions) || totalMsgRate > maxBundleMsgRate || totalBandwidth > maxBundleBandwidth) { if (stats.topics <= 1) { log.info("Unable to split hot namespace bundle {} since there is only one topic.", bundleName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 5a6b784..a64f283 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lo
[pulsar] branch master updated (dc884f8 -> f965fb8)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from dc884f8 Disable PulsarLedgerAuditorManagerTest when running on RocksDB backend (#13072) add f965fb8 [pulsar-broker] add uniform load shedder strategy to distribute traffic uniformly across brokers (#12902) No new revisions were added by this update. Summary of changes: conf/broker.conf | 12 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 19 +++ .../loadbalance/impl/UniformLoadShedder.java | 169 + site2/docs/administration-load-balance.md | 14 ++ site2/docs/reference-configuration.md | 2 + 5 files changed, 216 insertions(+) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java
[pulsar] branch master updated (b27a716 -> 19b1d1b)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from b27a716 Some depdency in integration tests scope should be test (#12696) add 19b1d1b [pulsar-broker] Provide option to split bundle based on load (#12378) No new revisions were added by this update. Summary of changes: .../pulsar/broker/admin/impl/NamespacesBase.java | 21 +-- .../broker/loadbalance/ModularLoadManager.java | 9 +++ .../loadbalance/impl/ModularLoadManagerImpl.java | 5 +- .../common/naming/NamespaceBundleFactory.java | 28 - .../broker/namespace/NamespaceServiceTest.java | 70 +++--- .../pulsar/common/policies/data/Policies.java | 5 +- .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 15 - 7 files changed, 135 insertions(+), 18 deletions(-)
[pulsar] branch master updated: [pulsar-client] Fix pending queue-size stats for batch messages (#12704)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9a3e7ec [pulsar-client] Fix pending queue-size stats for batch messages (#12704) 9a3e7ec is described below commit 9a3e7ecb326d045a10c3438f10bda63002d4603c Author: Rajan Dhabalia AuthorDate: Tue Nov 9 23:16:16 2021 -0800 [pulsar-client] Fix pending queue-size stats for batch messages (#12704) --- .../client/api/SimpleProducerConsumerStatTest.java | 16 .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 10 +- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 695b9b9..40d34a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -33,9 +33,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.gson.Gson; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -46,6 +46,7 @@ import org.testng.annotations.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; +import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; @@ -78,6 +79,11 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } }; } +@DataProvider(name = "batchingEnabled") +public Object[][] batchingEnabled() { +return new Object[][] { { true }, { false } }; +} + @Test(dataProvider = "batch_with_timeout") public void testSyncProducerAndConsumer(int batchMessageDelayMs, int ackTimeoutSec) throws Exception { log.info("-- Starting {} test --", methodName); @@ -427,14 +433,14 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } -@Test -public void testProducerPendingQueueSizeStats() throws Exception { +@Test(dataProvider = "batchingEnabled") +public void testProducerPendingQueueSizeStats(boolean batchingEnabled) throws Exception { log.info("-- Starting {} test --", methodName); ProducerBuilder producerBuilder = pulsarClient.newProducer() .topic("persistent://my-property/tp1/my-ns/my-topic1"); @Cleanup -Producer producer = producerBuilder.enableBatching(false).create(); +Producer producer = producerBuilder.enableBatching(batchingEnabled).create(); stopBroker(); @@ -443,6 +449,8 @@ public class SimpleProducerConsumerStatTest extends ProducerConsumerBase { String message = "my-message-" + i; producer.sendAsync(message.getBytes()); } +Awaitility.await().timeout(2, TimeUnit.MINUTES) +.until(() -> producer.getStats().getPendingQueueSize() == numMessages); assertEquals(producer.getStats().getPendingQueueSize(), numMessages); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 5e0579f..40c9b99 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.function.Consumer; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.client.api.BatcherBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; @@ -1924,7 +1925,14 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne } public int getPendingQueueSize() { -return pendingMessages.size(); +if (!isBatchMessagingEnabled()) { +return pendingMessages.size(); +} +MutableInt size = new MutableInt(0); +pendingMessages.forEach(op -> { +si
[pulsar] branch master updated: Fix flaky test: PersistentTopicsTest::testPeekWithSubscriptionNameNotExist (#12703)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new cc68f56 Fix flaky test: PersistentTopicsTest::testPeekWithSubscriptionNameNotExist (#12703) cc68f56 is described below commit cc68f56c9c32ef4b1ea8595c688cd684eb0253f6 Author: Rajan Dhabalia AuthorDate: Tue Nov 9 14:44:48 2021 -0800 Fix flaky test: PersistentTopicsTest::testPeekWithSubscriptionNameNotExist (#12703) --- .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 11 +++ 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 0333a0b..2cb2d57 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -77,6 +77,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.zookeeper.KeeperException; @@ -668,18 +669,20 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { @Test public void testPeekWithSubscriptionNameNotExist() throws Exception { final String topicName = "testTopic"; -final String topic = TopicName.get( +final TopicName topic = TopicName.get( TopicDomain.persistent.value(), testTenant, testNamespace, -topicName).toString(); +topicName); final String subscriptionName = "sub"; -((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, true).get(); +RetentionPolicies retention = new RetentionPolicies(10,10); +admin.namespaces().setRetention(topic.getNamespace(), retention); +((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic.toString(), 3, true).get(); final String partitionedTopic = topic + "-partition-0"; -Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); +Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic.toString()).create(); for (int i = 0; i < 100; ++i) { producer.send("test" + i); }
[pulsar] branch master updated (114185e -> ffccd46)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 114185e [website] Update team page to add new committer (#12688) add ffccd46 [pulsar-client] add pending-queue size metrics to producer stats (#12674) No new revisions were added by this update. Summary of changes: .../client/api/SimpleProducerConsumerStatTest.java | 21 + .../org/apache/pulsar/client/api/ProducerStats.java | 5 + .../pulsar/client/impl/ProducerStatsDisabled.java | 5 + .../client/impl/ProducerStatsRecorderImpl.java | 5 + 4 files changed, 36 insertions(+)
[pulsar] branch master updated (8992712 -> 24b0f4f)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 8992712 [website][upgrade]feat: docs migration - 2.7.1 / Admin API (#12676) add 24b0f4f [tools] fix TestRunMain test (#12675) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java | 5 + .../src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-)
[pulsar] branch master updated (7c219b1 -> 5523604)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 7c219b1 Add log error tracking for semaphore count leak (#12410) add 5523604 [pulsar-broker] Support configuration to rate-limit dispatching on batch message (#12294) No new revisions were added by this update. Summary of changes: conf/broker.conf | 4 ++ conf/standalone.conf | 4 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++ .../broker/service/AbstractBaseDispatcher.java | 11 ++- .../PersistentDispatcherMultipleConsumers.java | 11 +-- .../PersistentDispatcherSingleActiveConsumer.java | 7 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 10 +-- .../client/api/MessageDispatchThrottlingTest.java | 78 ++ site2/docs/reference-configuration.md | 2 + 9 files changed, 119 insertions(+), 14 deletions(-)
[pulsar] branch master updated (cc70a1f -> 1ce016c)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from cc70a1f [managedledger] NPE on OpAddEntry while ManagedLedger is closing (#12364) add 1ce016c [pulsar-broker] support split largest bundle of the namespace (#12361) No new revisions were added by this update. Summary of changes: .../pulsar/broker/admin/impl/NamespacesBase.java | 14 -- .../common/naming/NamespaceBundleFactory.java | 49 +++-- .../broker/namespace/NamespaceServiceTest.java | 51 ++ .../pulsar/common/policies/data/Policies.java | 1 + .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 3 +- 5 files changed, 101 insertions(+), 17 deletions(-)
[pulsar] branch master updated (8b55636 -> bfa2b29)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 8b55636 [Issue-11966][pulsar-proxy] set default http proxy request timeout (#11971) add bfa2b29 [pulsar-broker] Fix: handle failed partitions topic creation (#10374) No new revisions were added by this update. Summary of changes: .../broker/admin/impl/PersistentTopicsBase.java| 70 -- .../pulsar/broker/admin/v1/PersistentTopics.java | 3 +- .../pulsar/broker/admin/v2/PersistentTopics.java | 3 +- .../apache/pulsar/broker/admin/AdminApiTest2.java | 52 +--- .../org/apache/pulsar/broker/admin/AdminTest.java | 3 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 3 +- .../org/apache/pulsar/client/admin/Topics.java | 40 - .../pulsar/client/admin/internal/TopicsImpl.java | 19 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- .../org/apache/pulsar/admin/cli/CmdTopics.java | 6 +- 10 files changed, 151 insertions(+), 50 deletions(-)
[pulsar] branch master updated (da5bac9 -> 4011aa0)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from da5bac9 Update PIP GitHub Issue Template (#12176) add 4011aa0 [pulsar-admin] add option to get precise backlog on v1 topic (#8927) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java | 6 -- .../java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java| 5 +++-- .../test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java | 2 +- .../main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java | 5 - 4 files changed, 12 insertions(+), 6 deletions(-)
[pulsar] branch master updated: [pulsar-broker] handle NPE when check active consumer in stats (#12214)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2c6807a [pulsar-broker] handle NPE when check active consumer in stats (#12214) 2c6807a is described below commit 2c6807a578d5fab0711dde5531f15b924cbb3982 Author: Rajan Dhabalia AuthorDate: Tue Sep 28 11:37:03 2021 -0700 [pulsar-broker] handle NPE when check active consumer in stats (#12214) --- .../service/persistent/PersistentDispatcherSingleActiveConsumer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index c4bea81..3c22242 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -558,10 +558,10 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher @Override public boolean checkAndUnblockIfStuck() { -if (cursor.checkAndUpdateReadPositionChanged()) { +Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) { return false; } -Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this); int totalAvailablePermits = consumer.getAvailablePermits(); // consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
[pulsar] branch master updated (4e60de6 -> f154de7)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 4e60de6 [Issue 11632][C++] Turning on more compiler warnings, and enforcing warnings as errors (#11668) add f154de7 [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754) No new revisions were added by this update. Summary of changes: .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 15 --- .../apache/pulsar/client/impl/UnAckedMessageTracker.java | 1 + 2 files changed, 13 insertions(+), 3 deletions(-)
[pulsar] branch master updated: [pulsar-client] Fix: set and return topic name on message api (#11743)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0f8aef2 [pulsar-client] Fix: set and return topic name on message api (#11743) 0f8aef2 is described below commit 0f8aef2b9494cde8e8adc9d97d89a65d73ae8c35 Author: Rajan Dhabalia AuthorDate: Sun Aug 22 13:34:34 2021 -0700 [pulsar-client] Fix: set and return topic name on message api (#11743) --- .../api/PartitionedProducerConsumerTest.java | 40 ++ .../pulsar/client/impl/ProducerSemaphoreTest.java | 6 ++-- .../pulsar/client/impl/MessageBuilderImpl.java | 2 +- .../org/apache/pulsar/client/impl/MessageImpl.java | 5 +-- .../client/impl/TypedMessageBuilderImpl.java | 2 +- .../apache/pulsar/client/impl/MessageImplTest.java | 30 .../org/apache/pulsar/client/impl/MessageTest.java | 8 ++--- .../client/impl/schema/AutoConsumeSchemaTest.java | 2 +- 8 files changed, 68 insertions(+), 27 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index 3e9b194..543c172 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -903,6 +903,46 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } +@Test +public void testCustomPartitionedProducer() throws Exception { +PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection +TopicName topicName = null; +Producer producer = null; +try { +log.info("-- Starting {} test --", methodName); + +int numPartitions = 4; +topicName = TopicName + .get("persistent://my-property/my-ns/my-partitionedtopic1-" + System.currentTimeMillis()); + +admin.topics().createPartitionedTopic(topicName.toString(), numPartitions); + +RouterWithTopicName router = new RouterWithTopicName(); +producer = pulsarClient.newProducer().topic(topicName.toString()) +.messageRouter(router) +.create(); +for (int i = 0; i < 1; i++) { +String message = "my-message-" + i; + producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send(); +} +assertEquals(router.topicName, topicName.toString()); +} finally { +producer.close(); +pulsarClient.close(); +admin.topics().deletePartitionedTopic(topicName.toString()); +log.info("-- Exiting {} test --", methodName); +} +} + +private static class RouterWithTopicName implements MessageRouter { +static String topicName = null; + +@Override +public int choosePartition(Message msg, TopicMetadata metadata) { +topicName = msg.getTopicName(); +return 2; +} +} private static class AlwaysTwoMessageRouter implements MessageRouter { @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java index f46509e..c719cbd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java @@ -88,7 +88,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { for (int i = 0; i < messages / 2; i++) { MessageMetadata metadata = new MessageMetadata() .setNumMessagesInBatch(10); -MessageImpl msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES); +MessageImpl msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES, null); futures.add(producer.sendAsync(msg)); } Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize - messages/2); @@ -147,7 +147,7 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase { MessageMetadata metadata = new MessageMetadata() .setNumMessagesInBatch(10); -MessageImpl msg = MessageImpl.create(metadata, ByteBuffer.wrap(new byte[0]), Schema.BYTES); +MessageImpl msg = MessageImpl.create(met
[pulsar] branch master updated: PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8d8e6b7 PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725) 8d8e6b7 is described below commit 8d8e6b751ee0dc99306a1c61c22a8d75b5927811 Author: Rajan Dhabalia AuthorDate: Sat Aug 21 00:50:17 2021 -0700 PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725) * PIP 83 : Pulsar Reader: Message consumption with pooled buffer --- .../client/impl/BrokerClientIntegrationTest.java | 56 ++ .../apache/pulsar/client/api/ReaderBuilder.java| 10 .../pulsar/client/impl/MultiTopicsReaderImpl.java | 1 + .../pulsar/client/impl/ReaderBuilderImpl.java | 7 ++- .../org/apache/pulsar/client/impl/ReaderImpl.java | 1 + .../client/impl/conf/ReaderConfigurationData.java | 2 + 6 files changed, 76 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index fb3c30b..a111dd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.SchemaDefinition; @@ -953,4 +954,59 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + +/** + * It validates pooled message consumption for batch and non-batch messages. + * + * @throws Exception + */ +@Test(dataProvider = "booleanFlagProvider") +public void testConsumerWithPooledMessagesWithReader(boolean isBatchingEnabled) throws Exception { +log.info("-- Starting {} test --", methodName); + +@Cleanup +PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + +final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled; + +@Cleanup +Reader reader = newPulsarClient.newReader(Schema.BYTEBUFFER).topic(topic).poolMessages(true) +.startMessageId(MessageId.latest).create(); + +@Cleanup +Producer producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create(); + +final int numMessages = 100; +for (int i = 0; i < numMessages; i++) { +producer.newMessage().value(("value-" + i).getBytes(UTF_8)) +.eventTime((i + 1) * 100L).sendAsync(); +} +producer.flush(); + +// Reuse pre-allocated pooled buffer to process every message +byte[] val = null; +int size = 0; +for (int i = 0; i < numMessages; i++) { +Message msg = reader.readNext(); +ByteBuffer value; +try { +value = msg.getValue(); +int capacity = value.remaining(); +// expand the size of buffer if needed +if (capacity > size) { +val = new byte[capacity]; +size = capacity; +} +// read message into pooled buffer +value.get(val, 0, capacity); +// process the message +assertEquals(("value-" + i), new String(val, 0, capacity)); +assertTrue(value.isDirect()); +} finally { +msg.release(); +} +} +reader.close(); +producer.close(); +} } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index a84208b..4186df7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -280,4 +280,14 @@ public interface ReaderBuilder extends Cloneable { * @return the reader builder instance */ ReaderBuilder keyHashRange(Range... ranges); + +/** + * Enable pooling of messages and the underlying data buffers. + * + * When pooling is enabled, the application is responsible
[pulsar] branch master updated: [server] Allow broker to start with default backlogquota in bytes (#11671)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 80171a7 [server] Allow broker to start with default backlogquota in bytes (#11671) 80171a7 is described below commit 80171a733ab4799f912a8935f03c19554b9ca3b1 Author: Rajan Dhabalia AuthorDate: Mon Aug 16 23:24:02 2021 -0700 [server] Allow broker to start with default backlogquota in bytes (#11671) * [server] Allow broker to start with default backlogquota in bytes * remove sysout from test * fix test --- conf/broker.conf | 5 ++- .../apache/pulsar/broker/ServiceConfiguration.java | 9 - .../pulsar/broker/service/BacklogQuotaManager.java | 5 ++- .../org/apache/pulsar/broker/ConfigHelper.java | 5 ++- .../broker/service/BacklogQuotaManagerTest.java| 46 ++ .../common/naming/ServiceConfigurationTest.java| 8 ++-- .../policies/data/impl/BacklogQuotaImpl.java | 2 +- site2/docs/reference-configuration.md | 4 +- 8 files changed, 73 insertions(+), 11 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index e4c15c7..404d8d7 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -125,9 +125,12 @@ backlogQuotaCheckEnabled=true # How often to check for topics that have reached the quota backlogQuotaCheckIntervalInSeconds=60 -# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1. +# Deprecated - Use backlogQuotaDefaultLimitByte instead. backlogQuotaDefaultLimitGB=-1 +# Default per-topic backlog quota limit, less than 0 means no limitation. default is -1. +backlogQuotaDefaultLimitBytes=-1 + # Default per-topic backlog quota time limit in second, less than 0 means no limitation. default is -1. backlogQuotaDefaultLimitSecond=-1 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index aaec952..e98a5ce 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -334,12 +334,19 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int backlogQuotaCheckIntervalInSeconds = 60; +@Deprecated +@FieldContext( +category = CATEGORY_POLICIES, +doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\"" +) +private double backlogQuotaDefaultLimitGB = -1; + @FieldContext( category = CATEGORY_POLICIES, doc = "Default per-topic backlog quota limit by size, less than 0 means no limitation. default is -1." + " Increase it if you want to allow larger msg backlog" ) -private long backlogQuotaDefaultLimitGB = -1; +private long backlogQuotaDefaultLimitBytes = -1; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index bbb3ddf..f9ccb74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -53,9 +53,10 @@ public class BacklogQuotaManager { public BacklogQuotaManager(PulsarService pulsar) { this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled(); +double backlogQuotaGB = pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB(); this.defaultQuota = BacklogQuotaImpl.builder() - .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() -* BacklogQuotaImpl.BYTES_IN_GIGABYTE) +.limitSize(backlogQuotaGB > 0 ? (long) backlogQuotaGB * BacklogQuotaImpl.BYTES_IN_GIGABYTE +: pulsar.getConfiguration().getBacklogQuotaDefaultLimitBytes()) .limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond()) .retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy()) .build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java index b929636..ca8231a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java @@ -38,8 +38,11 @@ public class ConfigHelper { } public static BacklogQuota
[pulsar-adapters] branch master updated: [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git The following commit(s) were added to refs/heads/master by this push: new feaa450 [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26) feaa450 is described below commit feaa4504d567a5310e0a3d54003e721696491993 Author: Rajan Dhabalia AuthorDate: Tue Jul 20 13:05:37 2021 -0700 [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26) * [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer * add properties param for getEncryptionKey method --- .../clients/producer/PulsarKafkaProducer.java | 2 +- .../kafka/compat/CryptoKeyReaderFactory.java | 51 ++ .../kafka/compat/PulsarConsumerKafkaConfig.java| 12 +++ .../kafka/compat/PulsarProducerKafkaConfig.java| 17 .../producer/PulsarCliebtKafkaConfigTest.java | 111 + .../clients/producer/PulsarKafkaProducerTest.java | 1 - 6 files changed, 192 insertions(+), 2 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index f6da38e..fca5da8 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory; public class PulsarKafkaProducer implements Producer { private final PulsarClient client; -private final ProducerBuilder pulsarProducerBuilder; +final ProducerBuilder pulsarProducerBuilder; private final ConcurrentMap> producers = new ConcurrentHashMap<>(); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java new file mode 100644 index 000..649c297 --- /dev/null +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.kafka.compat; + +import java.util.Properties; +import java.util.Set; + +import org.apache.pulsar.client.api.CryptoKeyReader; + +/** + * Factory class to create {@link CryptoKeyReader} by using configuration stored in Producer/Consumer properties. + * + */ +public interface CryptoKeyReaderFactory { + +/** + * Create CryptoKeyReader object for Producer/Consumer. + * + * @param properties + *properties provided by user to create CryptoKeyReader based on configuration params + * @return CryptoKeyReader + */ +CryptoKeyReader create(Properties properties); + +/** + * Encryption keys for {@link CryptoKeyReader} while enabling encryption at producer. + * + * @param properties + *properties provided by user to get encryption-keys based on configuration params + * @return Set of encryption keys + */ +default Set getEncryptionKey(Properties properties) { +return null; +} +} diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index 09a9806..40858b1 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -22,6 +22,7 @@ imp
[pulsar] branch master updated (3fcfb22 -> eb4d8aa)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 3fcfb22 [Transaction] Fix the transaction marker does not deleted as expect. (#11126) add eb4d8aa Support new topic format for broker admin healthcheck endpoint. (#11268) No new revisions were added by this update. Summary of changes: .../pulsar/broker/admin/impl/BrokersBase.java | 83 ++ .../pulsar/broker/namespace/NamespaceService.java | 35 - .../apache/pulsar/broker/SLAMonitoringTest.java| 2 +- .../broker/admin/AdminApiHealthCheckTest.java | 83 ++ .../apache/pulsar/broker/admin/AdminApiTest.java | 4 +- .../pulsar/broker/admin/v1/V1_AdminApiTest.java| 4 +- .../org/apache/pulsar/client/admin/Brokers.java| 15 .../apache/pulsar/common/naming/TopicVersion.java | 5 ++ .../pulsar/client/admin/internal/BrokersImpl.java | 20 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 +- .../org/apache/pulsar/admin/cli/CmdBrokers.java| 6 +- 11 files changed, 220 insertions(+), 41 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java copy pulsar-broker/src/main/java/org/apache/pulsar/common/naming/package-info.java => pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java (94%)
[pulsar] branch master updated (2f47c32 -> 3550f2e)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 2f47c32 Allow to disable producer max queue size (#9650) add 3550f2e [pulsar-broker] Dispatch messaages to consumer with permits (#10417) No new revisions were added by this update. Summary of changes: .../PersistentDispatcherMultipleConsumers.java | 25 +++--- .../pulsar/client/api/ConsumerRedeliveryTest.java | 58 ++ .../apache/pulsar/client/impl/ConsumerBase.java| 4 ++ 3 files changed, 81 insertions(+), 6 deletions(-)
[pulsar] branch master updated (345cd33 -> 61639a2)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 345cd33 [Kinesis]Fix kinesis sink can not retry to send messages (#10420) add 61639a2 [pulsar-broker] Fix: Topic loading fails without any error when replicator init fails (#10432) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/service/persistent/PersistentTopic.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-)
[pulsar] branch master updated: PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new ef06691 PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184) ef06691 is described below commit ef06691531002c5d7cdbbdafc5494914ee8e0765 Author: Rajan Dhabalia AuthorDate: Tue Apr 20 16:26:28 2021 -0700 PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184) fix api, buffer-access, duplicate code --- .../client/api/SimpleProducerConsumerTest.java | 2 +- .../client/impl/BrokerClientIntegrationTest.java | 107 - .../apache/pulsar/client/api/ConsumerBuilder.java | 10 + .../java/org/apache/pulsar/client/api/Message.java | 15 ++ .../java/org/apache/pulsar/client/api/Schema.java | 17 ++ .../client/internal/DefaultImplementation.java | 20 ++ .../org/apache/pulsar/client/cli/CmdConsume.java | 34 ++- .../apache/pulsar/client/impl/ConsumerBase.java| 13 +- .../pulsar/client/impl/ConsumerBuilderImpl.java| 5 + .../apache/pulsar/client/impl/ConsumerImpl.java| 25 +- .../client/impl/ConsumerStatsRecorderImpl.java | 2 +- .../org/apache/pulsar/client/impl/MessageImpl.java | 256 ++--- .../apache/pulsar/client/impl/MessagesImpl.java| 4 +- .../client/impl/MultiTopicsConsumerImpl.java | 7 +- .../pulsar/client/impl/TopicMessageImpl.java | 10 + .../pulsar/client/impl/ZeroQueueConsumerImpl.java | 1 + .../impl/conf/ConsumerConfigurationData.java | 2 + .../pulsar/client/impl/schema/AbstractSchema.java | 3 +- .../client/impl/schema/ByteBufferSchema.java | 13 +- .../pulsar/client/impl/schema/BytesSchema.java | 2 +- .../client/impl/schema/LocalDateTimeSchema.java| 4 +- .../pulsar/client/impl/schema/StringSchema.java| 2 +- .../pulsar/testclient/PerformanceConsumer.java | 25 +- 23 files changed, 445 insertions(+), 134 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index abcb7ed..1123562 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -4029,4 +4029,4 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { blockedMessageLatch.countDown(); log.info("-- Exiting {} test --", methodName); } -} +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index c4d03ad..f7ce13b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.UUID.randomUUID; import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; import static org.mockito.Mockito.any; @@ -30,12 +31,14 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - +import io.netty.buffer.ByteBuf; import java.lang.reflect.Field; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -55,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Cleanup; - import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -132,6 +134,11 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { return new Object[][] { { SubscriptionType.Shared }, { SubscriptionType.Failover } }; } +@DataProvider(name = "booleanFlagProvider") +public Object[][] booleanFlagProvider() { +return new Object[][] { { true }, { false } }; +} + /** * Verifies unload namespace-bundle doesn't close shared connection used by other namespace-bundle. * @@ -918,4 +925,98 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { private static final class TestMessageObject{
[pulsar] branch master updated (4c434ad -> 4ac5469)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 4c434ad Add current ip address, long hostname and short hostname to /etc/hosts (#10233) add 4ac5469 Add Enrico Olivelli as committer in the team page (#10234) No new revisions were added by this update. Summary of changes: site2/website/data/team.js | 5 + 1 file changed, 5 insertions(+)
[pulsar] branch master updated (4673963 -> a004dfe)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 4673963 [pulsar-auth] Allow serializable stream-provider field into AuthenticationTls (#10020) add a004dfe [pulsar-websocket] Allow websocket to consume and pass message to client without decryption (#10026) No new revisions were added by this update. Summary of changes: .../java/org/apache/pulsar/websocket/ConsumerHandler.java | 14 ++ .../java/org/apache/pulsar/websocket/ReaderHandler.java| 9 + 2 files changed, 19 insertions(+), 4 deletions(-)
[pulsar] branch master updated (77c7e9c -> 4673963)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 77c7e9c [Docs][Geo Replication] Copy PR 9857 to older versioned docs (#10029) add 4673963 [pulsar-auth] Allow serializable stream-provider field into AuthenticationTls (#10020) No new revisions were added by this update. Summary of changes: distribution/server/licenses/LICENSE-Spotbugs.txt | 502 + distribution/server/src/assemble/NOTICE.bin.txt| 3 + pom.xml| 8 + pulsar-client/pom.xml | 7 + .../pulsar/client/impl/auth/AuthenticationTls.java | 19 +- .../client/impl/auth/AuthenticationTlsTest.java| 94 6 files changed, 632 insertions(+), 1 deletion(-) create mode 100644 distribution/server/licenses/LICENSE-Spotbugs.txt create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTlsTest.java
[pulsar] branch master updated (058236c -> dd32435)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 058236c Upgrade to Apache Avro 1.10.2 (#9898) add dd32435 [pulsar-discovery] Replace MetadataStore with ZooKeeper in discoveryservice (#9967) No new revisions were added by this update. Summary of changes: ...ources.java => LoadManagerReportResources.java} | 8 +- .../pulsar/broker/resources/PulsarResources.java | 2 + .../pulsar/client/api/BrokerServiceLookupTest.java | 43 ++--- .../service/web/DiscoveryServiceWebTest.java | 47 +++-- .../discovery/service/BrokerDiscoveryProvider.java | 21 ++- .../pulsar/discovery/service/DiscoveryService.java | 17 +- .../pulsar/discovery/service/ServerConnection.java | 6 +- .../service/web/DiscoveryServiceServlet.java | 40 +++-- .../service/web/MetadataStoreCacheLoader.java | 100 +-- .../service/web/ZookeeperCacheLoader.java | 190 - .../discovery/service/BaseDiscoveryTestSetup.java | 40 ++--- .../discovery/service/DiscoveryServiceTest.java| 29 +--- .../discovery/service/web/BaseZKStarterTest.java | 33 ++-- .../service/web/DiscoveryServiceWebTest.java | 120 +++-- ...Test.java => MetadataStoreCacheLoaderTest.java} | 23 ++- 15 files changed, 238 insertions(+), 481 deletions(-) copy pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/{LocalPoliciesResources.java => LoadManagerReportResources.java} (73%) copy pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java => pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoader.java (51%) delete mode 100644 pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java rename pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/{ZookeeperCacheLoaderTest.java => MetadataStoreCacheLoaderTest.java} (83%)
[pulsar] branch master updated (1fab5aa -> 9a6ae06)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 1fab5aa [pulsar-broker] Authorization service uses metadata-store api (#9586) add 9a6ae06 [pulsar-broker] Remove global-zk reference from Pulsar-service (#9648) No new revisions were added by this update. Summary of changes: .../org/apache/pulsar/broker/PulsarService.java| 8 +-- .../apache/pulsar/broker/admin/AdminResource.java | 62 +++--- .../pulsar/broker/namespace/NamespaceService.java | 3 +- .../pulsar/broker/service/BrokerService.java | 31 --- .../broker/service/persistent/PersistentTopic.java | 23 .../broker/admin/AdminApiGetLastMessageIdTest.java | 1 - .../apache/pulsar/broker/admin/AdminApiTest2.java | 2 - .../org/apache/pulsar/broker/admin/AdminTest.java | 2 - .../apache/pulsar/broker/admin/NamespacesTest.java | 1 - .../pulsar/broker/admin/PersistentTopicsTest.java | 3 -- .../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 2 - .../broker/loadbalance/LoadBalancerTest.java | 25 + .../loadbalance/SimpleLoadManagerImplTest.java | 18 --- .../apache/pulsar/broker/web/WebServiceTest.java | 3 +- .../api/AuthenticatedProducerConsumerTest.java | 2 - .../pulsar/client/api/PartitionCreationTest.java | 7 +-- .../functions/worker/PulsarWorkerService.java | 39 +- .../pulsar/functions/worker/WorkerService.java | 6 +-- .../service/WorkerServiceWithClassLoader.java | 5 +- 19 files changed, 78 insertions(+), 165 deletions(-)
[pulsar] branch master updated: [pulsar-broker] topic resources use metadata-store api (#9485)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0f9e211 [pulsar-broker] topic resources use metadata-store api (#9485) 0f9e211 is described below commit 0f9e211a85d0e5db4b7f96a9f8e402ad814c552d Author: Rajan Dhabalia AuthorDate: Thu Feb 18 10:35:41 2021 -0800 [pulsar-broker] topic resources use metadata-store api (#9485) * [pulsar-broker] topics resources use metadata-store api [pulsar-broker] MockZK: Handle zk-children watch notification fix test fix sync function initialization fix tests fix zk-create add timeout * address comments --- .../org/apache/pulsar/broker/PulsarService.java| 3 +- .../apache/pulsar/broker/admin/AdminResource.java | 126 ++ .../pulsar/broker/admin/impl/BaseResources.java| 22 ++- .../pulsar/broker/admin/impl/ClusterResources.java | 11 +- .../admin/impl/DynamicConfigurationResources.java | 5 +- .../broker/admin/impl/NamespaceResources.java | 30 +++- .../broker/admin/impl/PersistentTopicsBase.java| 189 - .../pulsar/broker/admin/impl/PulsarResources.java | 12 +- .../pulsar/broker/admin/impl/TenantResources.java | 4 +- .../pulsar/broker/service/BrokerService.java | 13 +- .../org/apache/pulsar/broker/admin/AdminTest.java | 4 + .../pulsar/broker/admin/v1/V1_AdminApiTest.java| 2 +- .../client/impl/BrokerClientIntegrationTest.java | 14 +- .../pulsar/client/impl/TopicsConsumerImplTest.java | 6 +- .../pulsar/client/cli/PulsarClientToolTest.java| 22 +-- .../runtime/thread/ThreadRuntimeFactory.java | 1 + .../cache/impl/JSONMetadataSerdeSimpleType.java| 2 - .../metadata/impl/AbstractMetadataStore.java | 2 + .../metadata/impl/LocalMemoryMetadataStore.java| 1 + .../pulsar/metadata/impl/ZKMetadataStore.java | 3 + .../apache/pulsar/metadata/MetadataStoreTest.java | 1 + .../pulsar/zookeeper/LocalBookkeeperEnsemble.java | 6 +- 22 files changed, 221 insertions(+), 258 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index aedd965..2f0e712 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -485,7 +485,8 @@ public class PulsarService implements AutoCloseable { coordinationService = new CoordinationServiceImpl(localMetadataStore); configurationMetadataStore = createConfigurationMetadataStore(); -pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore); +pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, +config.getZooKeeperOperationTimeoutSeconds()); orderedExecutor = OrderedExecutor.newBuilder() .numThreads(config.getNumOrderedExecutorThreads()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1228351..59af6ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -71,6 +71,9 @@ import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException; +import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException; +import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; @@ -81,7 +84,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper.States; -import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,14 +135,6 @@ public abstract class AdminResource extends PulsarWebResource { CreateMode.PERSISTENT, callback, null); } -protected boolean zkPathExists(String path) throws KeeperException, InterruptedException { -Stat stat = globalZk().exists(path, false); -if (null != stat) { -return true; -} -return false; -} - protected void
[pulsar] branch master updated (8d0c36e -> fcb2bb6)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 8d0c36e [C++] Removed usages of boost::regex (#9533) add fcb2bb6 Issue 9279: Pulsar Admin: add command to list bookies (#9283) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/admin/AdminResource.java | 5 + .../org/apache/pulsar/broker/admin/v2/Bookies.java | 30 ++ .../apache/pulsar/broker/admin/BookiesApiTest.java | 6 ++ .../org/apache/pulsar/client/admin/Bookies.java| 11 ++ .../pulsar/client/admin/internal/BookiesImpl.java | 34 ++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 29 ++--- .../org/apache/pulsar/admin/cli/CmdBookies.java| 10 ++ .../{BookieInfo.java => BookiesClusterInfo.java} | 17 ++- .../data/{BookieInfo.java => RawBookieInfo.java} | 7 +- .../bookkeeper/client/PulsarMockBookKeeper.java| 117 +++-- .../bookkeeper/client/PulsarMockLedgerHandle.java | 5 +- 11 files changed, 227 insertions(+), 44 deletions(-) copy pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/{BookieInfo.java => BookiesClusterInfo.java} (82%) copy pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/{BookieInfo.java => RawBookieInfo.java} (91%)
[pulsar] branch master updated (efb2089 -> 7fd3218)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from efb2089 [Issue9507][testclient] add --batch-index-ack for the pulsar-perf (#9521) add 7fd3218 [pulsar-broker] namespace resources use metadata-store api (#9351) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/admin/AdminResource.java | 38 +- .../pulsar/broker/admin/impl/BaseResources.java|2 +- .../broker/admin/impl/NamespaceResources.java | 17 +- .../pulsar/broker/admin/impl/NamespacesBase.java | 1380 +--- .../pulsar/broker/admin/impl/PulsarResources.java |2 +- .../pulsar/broker/web/PulsarWebResource.java | 76 ++ .../apache/pulsar/broker/admin/NamespacesTest.java | 21 +- .../pulsar/broker/admin/v1/V1_AdminApiTest.java|7 + .../apache/pulsar/metadata/MetadataCacheTest.java | 35 +- 9 files changed, 453 insertions(+), 1125 deletions(-)
[pulsar] branch master updated: [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger (#9212)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 3d5d6f6 [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger (#9212) 3d5d6f6 is described below commit 3d5d6f6a1681cf842a2e3334d412fa449c553fd0 Author: Rajan Dhabalia AuthorDate: Sat Feb 6 20:58:32 2021 -0800 [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger (#9212) add more error log fix list assignment --- .../service/schema/BookkeeperSchemaStorage.java| 52 ++-- .../schema/DefaultSchemaRegistryService.java | 5 ++ .../broker/service/schema/SchemaRegistry.java | 2 + .../service/schema/SchemaRegistryServiceImpl.java | 71 ++ .../service/schema/exceptions/SchemaException.java | 10 +++ ...hemaRegistryServiceWithSchemaDataValidator.java | 7 ++- .../broker/service/schema/ClientGetSchemaTest.java | 67 .../java/org/apache/pulsar/schema/SchemaTest.java | 16 +++-- .../common/protocol/schema/SchemaStorage.java | 2 + 9 files changed, 206 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 475e6f9..3c29da8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -37,7 +37,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -47,6 +49,7 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; import org.apache.pulsar.common.protocol.schema.SchemaStorage; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.protocol.schema.StoredSchema; @@ -129,7 +132,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage { @Override public CompletableFuture>> getAll(String key) { CompletableFuture>> result = new CompletableFuture<>(); -getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> { +getLocator(key).thenAccept(locator -> { if (log.isDebugEnabled()) { log.debug("[{}] Get all schemas - locator: {}", key, locator); } @@ -154,9 +157,42 @@ public class BookkeeperSchemaStorage implements SchemaStorage { return result; } +private CompletableFuture> getLocator(String key) { +return getSchemaLocator(getSchemaPath(key)); +} + +public void clearLocatorCache(String key) { +localZkCache.invalidate(getSchemaPath(key)); +} + +@VisibleForTesting +List getSchemaLedgerList(String key) throws IOException { +Optional locatorEntry = null; +try { +locatorEntry = getLocator(key).get(); +} catch (Exception e) { +log.warn("Failed to get list of schema-storage ledger for {}", key, +(e instanceof ExecutionException ? e.getCause() : e)); +throw new IOException("Failed to get schema ledger for" + key); +} +LocatorEntry entry = locatorEntry.orElse(null); +return entry != null ? entry.locator.getIndexList().stream().map(i -> i.getPosition().getLedgerId()) +.collect(Collectors.toList()) : null; +} + +@VisibleForTesting +BookKeeper getBookKeeper() { +return bookKeeper; +} + +@Override +public CompletableFuture delete(String key, boolean forcefully) { +return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new); +} + @Override public CompletableFuture delete(String key) { -return deleteSchema(key).thenApply(LongSchemaVersion::new); +return delete(key, false); } @NotNull @@ -350,9 +386,10 @@ public class BookkeeperSchemaStorage implements SchemaStorage { } @NotNull -private CompletableFuture deleteSchema(String schemaId) { -return getSchema(schemaId).thenCompose(sc
[pulsar] branch master updated (683ee5f -> 19dec2c)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 683ee5f PIP-45: Added session events to metadata store (#9273) add 19dec2c [pulsar-broker] MockZK: Handle zk-children watch notification (#9473) No new revisions were added by this update. Summary of changes: .../bookkeeper/mledger/impl/MetaStoreImplTest.java | 44 +- .../java/org/apache/zookeeper/MockZooKeeper.java | 2 + 2 files changed, 44 insertions(+), 2 deletions(-)
[pulsar] branch master updated: [pulsar-broker] broker resources use metadata-store api (#9346)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new fa66a24 [pulsar-broker] broker resources use metadata-store api (#9346) fa66a24 is described below commit fa66a2410e3266ad5b01f00f96504e84fe6084cd Author: Rajan Dhabalia AuthorDate: Fri Jan 29 19:03:36 2021 -0800 [pulsar-broker] broker resources use metadata-store api (#9346) * [pulsar-broker] broker resources use metadata-store api fix test * fix api --- .../org/apache/pulsar/broker/PulsarService.java| 2 +- .../apache/pulsar/broker/admin/AdminResource.java | 27 -- .../pulsar/broker/admin/impl/BaseResources.java| 25 + .../pulsar/broker/admin/impl/BrokersBase.java | 61 ++ .../pulsar/broker/admin/impl/ClustersBase.java | 8 +-- ...ces.java => DynamicConfigurationResources.java} | 18 +++ .../pulsar/broker/admin/impl/PulsarResources.java | 4 +- .../pulsar/broker/web/PulsarWebResource.java | 29 ++ .../org/apache/pulsar/broker/admin/AdminTest.java | 8 --- 9 files changed, 77 insertions(+), 105 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3e03297..fc8f309 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -484,7 +484,7 @@ public class PulsarService implements AutoCloseable { coordinationService = new CoordinationServiceImpl(localMetadataStore); configurationMetadataStore = createConfigurationMetadataStore(); -pulsarResources = new PulsarResources(configurationMetadataStore); +pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore); orderedExecutor = OrderedExecutor.newBuilder() .numThreads(config.getNumOrderedExecutorThreads()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 1728c88..453c39b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -23,8 +23,6 @@ import static org.apache.pulsar.common.util.Codec.decode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import java.net.MalformedURLException; -import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -39,7 +37,6 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; -import javax.ws.rs.core.UriBuilder; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -417,30 +414,6 @@ public abstract class AdminResource extends PulsarWebResource { } } -/** - * Redirect the call to the specified broker. - * - * @param broker - *Broker name - * @throws MalformedURLException - * In case the redirect happens - */ -protected void validateBrokerName(String broker) throws MalformedURLException { -String brokerUrl = String.format("http://%s;, broker); -String brokerUrlTls = String.format("https://%s;, broker); -if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress()) -&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) { -String[] parts = broker.split(":"); -checkArgument(parts.length == 2, String.format("Invalid broker url %s", broker)); -String host = parts[0]; -int port = Integer.parseInt(parts[1]); - -URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build(); -log.debug("[{}] Redirecting the rest call to {}: broker={}", clientAppId(), redirect, broker); -throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); -} -} - protected Policies getNamespacePolicies(NamespaceName namespaceName) { try { final String namespace = namespaceName.toString(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java index 07cd9c4..dfbad73 100644 --- a/pulsar-
[pulsar] branch master updated: [pulsar-broker] cluster resources use metadata-store api (#9338)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 0edcaa0 [pulsar-broker] cluster resources use metadata-store api (#9338) 0edcaa0 is described below commit 0edcaa09150521a2a7e189de43d004ed799db2ee Author: Rajan Dhabalia AuthorDate: Thu Jan 28 12:20:10 2021 -0800 [pulsar-broker] cluster resources use metadata-store api (#9338) * [pulsar-broker] Make tenant rest-api async and use metadata-store api fix tests fix intermittent test failure * [pulsar-broker] cluster resources use metadata-store api * fix test --- .../org/apache/pulsar/broker/PulsarService.java| 18 +- .../apache/pulsar/broker/admin/AdminResource.java | 6 +- .../pulsar/broker/admin/impl/BaseResources.java| 152 ++ .../pulsar/broker/admin/impl/ClusterResources.java | 51 .../pulsar/broker/admin/impl/ClustersBase.java | 204 +- .../broker/admin/impl/NamespaceResources.java | 51 .../pulsar/broker/admin/impl/PulsarResources.java | 37 +++ .../pulsar/broker/admin/impl/TenantResources.java | 28 ++ .../pulsar/broker/admin/impl/TenantsBase.java | 307 - .../pulsar/broker/web/PulsarWebResource.java | 157 ++- .../apache/pulsar/broker/admin/AdminApiTest.java | 8 +- .../org/apache/pulsar/broker/admin/AdminTest.java | 238 +++- .../broker/auth/MockedPulsarServiceBaseTest.java | 1 + .../OwnerShipForCurrentServerTestBase.java | 2 +- .../pulsar/broker/service/BrokerServiceTest.java | 8 +- .../broker/transaction/TransactionTestBase.java| 2 +- .../apache/pulsar/broker/web/WebServiceTest.java | 23 +- .../metadata/api/MetadataStoreException.java | 4 + .../metadata/cache/impl/MetadataCacheImpl.java | 10 +- .../metadata/impl/AbstractMetadataStore.java | 7 + .../MockedZooKeeperClientFactoryImpl.java | 2 +- 21 files changed, 958 insertions(+), 358 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index a017d28..3e03297 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -69,6 +69,7 @@ import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.ZookeeperSessionExpiredHandlers; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.admin.impl.PulsarResources; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -220,6 +221,8 @@ public class PulsarService implements AutoCloseable { private MetadataStoreExtended localMetadataStore; private CoordinationService coordinationService; +private MetadataStoreExtended configurationMetadataStore; +private PulsarResources pulsarResources; public enum State { Init, Started, Closed @@ -280,6 +283,14 @@ public class PulsarService implements AutoCloseable { new DefaultThreadFactory("zk-cache-callback")); } +public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { +return MetadataStoreExtended.create(config.getConfigurationStoreServers(), +MetadataStoreConfig.builder() +.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis()) +.allowReadOnlyOperations(false) +.build()); +} + /** * Close the current pulsar service. All resources are released. */ @@ -396,6 +407,9 @@ public class PulsarService implements AutoCloseable { if (localMetadataStore != null) { localMetadataStore.close(); } +if (configurationMetadataStore != null) { +configurationMetadataStore.close(); +} state = State.Closed; isClosedCondition.signalAll(); @@ -467,9 +481,11 @@ public class PulsarService implements AutoCloseable { } localMetadataStore = createLocalMetadataStore(); - coordinationService = new CoordinationServiceImpl(localMetadataStore); +configurationMetadataStore = createConfigurationMetadataStore(); +pulsarResources = new PulsarResources(configurationMetadataStore); + orderedExecutor = OrderedExecutor.newBuilder() .numThreads(config.getNumOrderedExecu
[pulsar] branch master updated (e58a906 -> ab9c77a)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from e58a906 Add azure offloader to website (#9018) add ab9c77a [pulsar-broker] validate namespace isolation policy regex before updating (#8804) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/admin/AdminApiTest.java | 29 ++ .../policies/data/NamespaceIsolationData.java | 21 +++- 2 files changed, 49 insertions(+), 1 deletion(-)
[pulsar] branch master updated: [pulsar-client] Handle NPE while receiving ack for closed producer (#8979)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 1d41036 [pulsar-client] Handle NPE while receiving ack for closed producer (#8979) 1d41036 is described below commit 1d410365860c342a09c2aacad9b2f78e737a5dea Author: Rajan Dhabalia AuthorDate: Wed Dec 16 14:35:18 2020 -0800 [pulsar-client] Handle NPE while receiving ack for closed producer (#8979) --- .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 212292e..3c66065 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -376,7 +376,15 @@ public class ClientCnx extends PulsarHandler { ledgerId, entryId); } -producers.get(producerId).ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId); +ProducerImpl producer = producers.get(producerId); +if (producer != null) { +producer.ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId); +} else { +if (log.isDebugEnabled()) { +log.debug("Producer is {} already closed, ignore published message [{}-{}]", producerId, ledgerId, +entryId); +} +} } @Override
[pulsar] branch master updated: [pulsar-broker] capture stats with precise backlog (#8928)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a7f692f [pulsar-broker] capture stats with precise backlog (#8928) a7f692f is described below commit a7f692f981ddf2f86a69809867797f7269196884 Author: Rajan Dhabalia AuthorDate: Mon Dec 14 14:25:11 2020 -0800 [pulsar-broker] capture stats with precise backlog (#8928) Co-authored-by: Sijie Guo --- .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index f957532..07af718 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1537,7 +1537,7 @@ public class PersistentTopic extends AbstractTopic // Populate subscription specific stats here topicStatsStream.writePair("msgBacklog", -subscription.getNumberOfEntriesInBacklog(false)); +subscription.getNumberOfEntriesInBacklog(true)); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
[pulsar] branch master updated (a145858 -> 7a8c6ba)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from a145858 [docs] Re-org topics in Admin API section (#8375) add 7a8c6ba [pulsar-broker] Fix: bookie-isolation placement-policy was not configuring rackaware policy (#8461) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/BookKeeperClientFactoryImpl.java | 4 .../apache/pulsar/broker/ManagedLedgerClientFactory.java | 1 - .../pulsar/broker/service/BrokerBookieIsolationTest.java | 14 -- 3 files changed, 16 insertions(+), 3 deletions(-)
[pulsar] branch master updated (505dcd5 -> aa16497)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 505dcd5 Clear the recently joined consumers when there are only one consumer under the Key_Shared subscription. (#8427) add aa16497 [pulsar-broker] get list of bundles under a namespace (#8450) No new revisions were added by this update. Summary of changes: .../apache/pulsar/broker/admin/v2/Namespaces.java | 1 - .../apache/pulsar/broker/admin/AdminApiTest2.java | 11 +++ .../org/apache/pulsar/client/admin/Namespaces.java | 29 ++ .../client/admin/internal/NamespacesImpl.java | 35 ++ .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 5 files changed, 88 insertions(+), 1 deletion(-)
[pulsar] branch master updated (a9e2f7e -> 9d74007)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from a9e2f7e [pulsar-broker] Keep max-concurrent http web-request configurable (#7250) add 9d74007 fix: intermittent test failure due to invalid cache (#8405) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java | 1 + 1 file changed, 1 insertion(+)
[pulsar] branch master updated (da61b56 -> a9e2f7e)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from da61b56 [pulsar-broker] add broker config to enforce producer to publish encrypted message (#8055) add a9e2f7e [pulsar-broker] Keep max-concurrent http web-request configurable (#7250) No new revisions were added by this update. Summary of changes: conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 3 +++ .../src/main/java/org/apache/pulsar/broker/web/WebService.java | 5 +++-- 4 files changed, 12 insertions(+), 2 deletions(-)
[pulsar] branch master updated (f96bc63 -> da61b56)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from f96bc63 Fix deadlock that occurred during topic ownership check (#8406) add da61b56 [pulsar-broker] add broker config to enforce producer to publish encrypted message (#8055) No new revisions were added by this update. Summary of changes: conf/broker.conf | 3 ++ conf/standalone.conf | 3 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 5 .../apache/pulsar/broker/service/ServerCnx.java| 4 ++- .../pulsar/broker/service/ServerCnxTest.java | 35 ++ 5 files changed, 49 insertions(+), 1 deletion(-)
[pulsar] branch master updated (06c9f57 -> d16b7f5)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 06c9f57 update helm deploy (#8404) add d16b7f5 [pulsar-cli] Fix properties name (#7249) No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[pulsar] branch master updated: [pulsar-broker] Fix: Tenant-admin should able to lookup topic (#8353)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4c7f83b [pulsar-broker] Fix: Tenant-admin should able to lookup topic (#8353) 4c7f83b is described below commit 4c7f83b93b454b4f74dbc418a519ceb6dc58296b Author: Rajan Dhabalia AuthorDate: Thu Oct 29 00:30:56 2020 -0700 [pulsar-broker] Fix: Tenant-admin should able to lookup topic (#8353) Co-authored-by: Sijie Guo --- .../java/org/apache/pulsar/broker/lookup/TopicLookupBase.java | 10 +- .../pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java | 3 ++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java index cd140d5..19ddce75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java @@ -65,7 +65,7 @@ public class TopicLookupBase extends PulsarWebResource { try { validateClusterOwnership(topicName.getCluster()); -checkConnect(topicName); +validateAdminAndClientPermission(topicName); validateGlobalNamespaceOwnership(topicName.getNamespaceObject()); } catch (WebApplicationException we) { // Validation checks failed @@ -127,6 +127,14 @@ public class TopicLookupBase extends PulsarWebResource { }); } +private void validateAdminAndClientPermission(TopicName topic) throws RestException, Exception { +try { +validateAdminAccessForTenant(topic.getTenant()); +} catch (Exception e) { +checkConnect(topic); +} +} + protected String internalGetNamespaceBundle(TopicName topicName) { validateSuperUserAccess(); try { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java index 26cb3c1..47a8921 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java @@ -135,7 +135,6 @@ public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest { try (PulsarAdmin admin = buildAdminClient("admin")) { Policies policies = new Policies(); policies.bundles = new BundlesData(4); -policies.auth_policies.namespace_auth.put("admin", ImmutableSet.of(AuthAction.produce, AuthAction.consume)); policies.replication_clusters = ImmutableSet.of("test"); admin.namespaces().createNamespace("tenant/ns", policies); try { @@ -144,6 +143,8 @@ public class BrokerAdminClientTlsAuthTest extends MockedPulsarServiceBaseTest { ex.printStackTrace(); fail("Should not have thrown an exception"); } +String topicName = String.format("persistent://%s/t1", "tenant/ns"); +admin.lookups().lookupTopic(topicName); } }
[pulsar] branch master updated (647d3c2 -> 7f9b7cf)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from 647d3c2 PLSR-1240 upgrade GRPC to 1.31 to avoid deadlock (#8351) add 7f9b7cf [pulsar-broker] configure namespace anti-affinity in local policies (#8349) No new revisions were added by this update. Summary of changes: .../pulsar/broker/admin/impl/NamespacesBase.java | 75 +- .../broker/loadbalance/impl/LoadManagerShared.java | 18 +++--- .../loadbalance/impl/ModularLoadManagerImpl.java | 12 ++-- .../pulsar/common/policies/data/LocalPolicies.java | 6 +- .../pulsar/common/policies/data/Policies.java | 6 +- 5 files changed, 67 insertions(+), 50 deletions(-)
[pulsar] branch master updated (a081bae -> 54bdf2e)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from a081bae [website] Update the Event page (#8116) add 54bdf2e [pulsar-broker-admin] Fix: split bundle overwrites local-policies (#8313) No new revisions were added by this update. Summary of changes: .../pulsar/broker/namespace/NamespaceService.java | 4 ++-- .../namespace/NamespaceCreateBundlesTest.java | 25 ++ 2 files changed, 27 insertions(+), 2 deletions(-)
[pulsar] branch master updated: [pulsar-broker-admin] Support replication dispatch-rate limiting for v1-namespace api (#8314)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new faebd13 [pulsar-broker-admin] Support replication dispatch-rate limiting for v1-namespace api (#8314) faebd13 is described below commit faebd13639a8e41e36d48bcd0756de2c6ab40411 Author: Rajan Dhabalia AuthorDate: Wed Oct 21 10:32:03 2020 -0700 [pulsar-broker-admin] Support replication dispatch-rate limiting for v1-namespace api (#8314) --- .../apache/pulsar/broker/admin/v1/Namespaces.java | 24 ++ 1 file changed, 24 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 3fc076b..035c6f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin.v1; import com.google.common.collect.Lists; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import org.apache.pulsar.broker.admin.impl.NamespacesBase; @@ -637,6 +638,29 @@ public class Namespaces extends NamespacesBase { return internalGetSubscriptionDispatchRate(); } +@POST +@Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate") +@ApiOperation(value = "Set replicator dispatch-rate throttling for all topics of the namespace") +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission") }) +public void setReplicatorDispatchRate(@PathParam("tenant") String tenant, +@PathParam("cluster") String cluster, @PathParam("namespace") String namespace, +@ApiParam(value = "Replicator dispatch rate for all topics of the specified namespace") DispatchRate dispatchRate) { +validateNamespaceName(tenant, cluster, namespace); +internalSetReplicatorDispatchRate(dispatchRate); +} + +@GET +@Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate") +@ApiOperation(value = "Get replicator dispatch-rate configured for the namespace, -1 represents not configured yet") +@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), +@ApiResponse(code = 404, message = "Namespace does not exist") }) +public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String tenant, +@PathParam("cluster") String cluster, +@PathParam("namespace") String namespace) { +validateNamespaceName(tenant, cluster, namespace); +return internalGetReplicatorDispatchRate(); +} + @GET @Path("/{property}/{cluster}/{namespace}/backlogQuotaMap") @ApiOperation(hidden = true, value = "Get backlog quota map on a namespace.")
[pulsar] branch master updated: Always use SNI for TLS enabled Pulsar Java client. (#8117)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new f2933f7 Always use SNI for TLS enabled Pulsar Java client. (#8117) f2933f7 is described below commit f2933f7da4850814d92fac0e54c5314c51c8fc32 Author: Rolf Arne Corneliussen AuthorDate: Thu Sep 24 22:37:51 2020 +0200 Always use SNI for TLS enabled Pulsar Java client. (#8117) Co-authored-by: Rolf Arne Corneliussen --- .../org/apache/pulsar/client/api/TlsSniTest.java | 66 ++ .../apache/pulsar/client/impl/ConnectionPool.java | 56 +++--- .../client/impl/PulsarChannelInitializer.java | 58 +-- .../util/keystoretls/KeyStoreSSLContext.java | 10 +++- 4 files changed, 124 insertions(+), 66 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java new file mode 100644 index 000..fc8c242 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import java.net.InetAddress; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.impl.auth.AuthenticationTls; +import org.testng.annotations.Test; + +import lombok.Cleanup; + +public class TlsSniTest extends TlsProducerConsumerBase { + +/** + * Verify that using an IP-address in the broker service URL will work with using the SNI capabilities + * of the client. If we try to create an {@link javax.net.ssl.SSLEngine} with a peer host that is an + * IP address, the peer host is ignored, see for example + * {@link io.netty.handler.ssl.ReferenceCountedOpenSslEngine}. + * + */ +@Test +public void testIpAddressInBrokerServiceUrl() throws Exception { +String topicName = "persistent://my-property/use/my-ns/my-topic1"; + +URI brokerServiceUrlTls = new URI(pulsar.getBrokerServiceUrlTls()); + +String brokerServiceIpAddressUrl = String.format("pulsar+ssl://%s:%d", + InetAddress.getByName(brokerServiceUrlTls.getHost()).getHostAddress(), +brokerServiceUrlTls.getPort()); + +ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl) + .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false) +.enableTlsHostnameVerification(false) +.operationTimeout(1000, TimeUnit.MILLISECONDS); +Map authParams = new HashMap<>(); +authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); +authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); +clientBuilder.authentication(AuthenticationTls.class.getName(), authParams); + +@Cleanup +PulsarClient pulsarClient = clientBuilder.build(); +// should be able to create producer successfully +pulsarClient.newProducer().topic(topicName).create(); +} +} + diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 32d7195..6203990 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -90,7 +90,7 @@ public class ConnectionPool implements Closeable { bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { -channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy); +channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier); bootstrap.handler(channelInitializerHandler); } catch (Exception
[pulsar] branch master updated (fefaf52 -> a0877dd)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from fefaf52 Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098) add a0877dd [pulsar-broker] add configuration to set number of channels per bookie (#7910) No new revisions were added by this update. Summary of changes: conf/broker.conf| 3 +++ conf/standalone.conf| 3 +++ .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 ++ .../java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java | 2 +- site2/docs/reference-configuration.md | 1 + 5 files changed, 14 insertions(+), 1 deletion(-)
[pulsar] branch master updated: [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a7e30d8 [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062) a7e30d8 is described below commit a7e30d8150f626e75d8a17980e6e1019de659a5d Author: Rajan Dhabalia AuthorDate: Mon Sep 21 15:39:09 2020 -0700 [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062) make sni-proxy connection creation thread-safe remove unused pair initialize channel explicitly when sni-proxy is configured initialize channel in io thread fix channel var --- .../pulsar/client/api/ProxyProtocolTest.java | 3 +- .../apache/pulsar/client/impl/ConnectionPool.java | 105 +++-- .../client/impl/PulsarChannelInitializer.java | 41 3 files changed, 79 insertions(+), 70 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java index 964ebd9..d264b83 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java @@ -40,7 +40,7 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase { // Client should try to connect to proxy and pass broker-url as SNI header String proxyUrl = pulsar.getBrokerServiceUrlTls(); -String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651"; +String brokerServiceUrl = "pulsar+ssl://unresolvable-address:6651"; String topicName = "persistent://my-property/use/my-ns/my-topic1"; ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl) @@ -53,7 +53,6 @@ public class ProxyProtocolTest extends TlsProducerConsumerBase { @Cleanup PulsarClient pulsarClient = clientBuilder.build(); - // should be able to create producer successfully pulsarClient.newProducer().topic(topicName).create(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index 316145e..32d7195 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -19,7 +19,6 @@ package org.apache.pulsar.client.impl; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -37,7 +36,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -66,6 +64,7 @@ public class ConnectionPool implements Closeable { private final ClientConfigurationData clientConfig; private final EventLoopGroup eventLoopGroup; private final int maxConnectionsPerHosts; +private final boolean isSniProxy; protected final DnsNameResolver dnsResolver; @@ -78,6 +77,8 @@ public class ConnectionPool implements Closeable { this.eventLoopGroup = eventLoopGroup; this.clientConfig = conf; this.maxConnectionsPerHosts = conf.getConnectionsPerBroker(); +this.isSniProxy = clientConfig.isUseTls() && clientConfig.getProxyProtocol() != null +&& StringUtils.isNotBlank(clientConfig.getProxyServiceUrl()); pool = new ConcurrentHashMap<>(); bootstrap = new Bootstrap(); @@ -89,7 +90,7 @@ public class ConnectionPool implements Closeable { bootstrap.option(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); try { -channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier); +channelInitializerHandler = new PulsarChannelInitializer(conf, clientCnxSupplier, isSniProxy); bootstrap.handler(channelInitializerHandler); } catch (Exception e) { log.error("Failed to create channel initializer"); @@ -224,18 +225,24 @@ public class ConnectionPool implements Closeable { * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server */ private CompletableFuture createConnection(InetSocketAddress unresolvedAddress) { -String hostname = unresolvedAddress.getHostString(); -int port = unresolvedAddress.getPort(); +int port; +CompletableFuture> resolvedAddress = nu
[pulsar] branch master updated: [pulsar-client] support input-stream for trustStore cert (#7442)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new d3c0ccb [pulsar-client] support input-stream for trustStore cert (#7442) d3c0ccb is described below commit d3c0ccb17f282e8063173a50ee825efa280d4f92 Author: Rajan Dhabalia AuthorDate: Sat Sep 19 01:13:38 2020 -0700 [pulsar-client] support input-stream for trustStore cert (#7442) * [pulsar-client] support input-stream for trustStore cert remove file closing fix check-style * fix flaky test --- .../pulsar/client/api/TlsProducerConsumerTest.java | 23 +++--- .../admin/internal/http/AsyncHttpConnector.java| 14 + .../client/api/AuthenticationDataProvider.java | 10 +++ .../org/apache/pulsar/client/impl/HttpClient.java | 13 .../client/impl/PulsarChannelInitializer.java | 27 ++--- .../client/impl/auth/AuthenticationDataTls.java| 15 -- .../pulsar/client/impl/auth/AuthenticationTls.java | 13 ++-- .../util/NettyClientSslContextRefresher.java | 12 ++-- .../apache/pulsar/common/util/SecurityUtility.java | 35 +- 9 files changed, 122 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java index 9f1eac8..614e75e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java @@ -142,16 +142,18 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase { log.info("-- Starting {} test --", methodName); String topicName = "persistent://my-property/use/my-ns/my-topic1"; ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls()) - .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) +.enableTls(true).allowTlsInsecureConnection(false) .operationTimeout(1000, TimeUnit.MILLISECONDS); AtomicInteger index = new AtomicInteger(0); ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH); ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH); +ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH); Supplier certProvider = () -> getStream(index, certStream); Supplier keyProvider = () -> getStream(index, keyStream); -AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider); +Supplier trustStoreProvider = () -> getStream(index, trustStoreStream); +AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider); clientBuilder.authentication(auth); @Cleanup PulsarClient pulsarClient = clientBuilder.build(); @@ -196,16 +198,20 @@ public class TlsProducerConsumerTest extends TlsProducerConsumerBase { public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws Exception { log.info("-- Starting {} test --", methodName); ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls()) - .tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false) +.enableTls(true).allowTlsInsecureConnection(false) .operationTimeout(1000, TimeUnit.MILLISECONDS); AtomicInteger certIndex = new AtomicInteger(1); AtomicInteger keyIndex = new AtomicInteger(0); +AtomicInteger trustStoreIndex = new AtomicInteger(1); ByteArrayInputStream certStream = createByteInputStream(TLS_CLIENT_CERT_FILE_PATH); ByteArrayInputStream keyStream = createByteInputStream(TLS_CLIENT_KEY_FILE_PATH); +ByteArrayInputStream trustStoreStream = createByteInputStream(TLS_TRUST_CERT_FILE_PATH); Supplier certProvider = () -> getStream(certIndex, certStream, keyStream/* invalid cert file */); Supplier keyProvider = () -> getStream(keyIndex, keyStream); -AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider); +Supplier trustStoreProvider = () -> getStream(trustStoreIndex, trustStoreStream, +keyStream/* invalid cert file */); +AuthenticationTls auth = new AuthenticationTls(certProvider, keyProvider, trustStoreProvider); clientBuilder.authentication(auth); @Cleanup PulsarClient pulsarClient = clientBu
[pulsar] branch master updated: [pulsar-test] shutdown pulsar-client to cleanup forcefully (#7501)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9f9db24e [pulsar-test] shutdown pulsar-client to cleanup forcefully (#7501) 9f9db24e is described below commit 9f9db24e37398a0b66c8af324b34674afeb63066 Author: Rajan Dhabalia AuthorDate: Fri Jul 10 10:29:46 2020 -0700 [pulsar-test] shutdown pulsar-client to cleanup forcefully (#7501) --- .../java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index f808b96..394cd9c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -192,7 +192,7 @@ public abstract class MockedPulsarServiceBaseTest { admin = null; } if (pulsarClient != null) { -pulsarClient.close(); +pulsarClient.shutdown(); pulsarClient = null; } if (pulsar != null) {
[pulsar] branch master updated: PIP 37: [pulsar-client] support large message size (#4400)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 870a637 PIP 37: [pulsar-client] support large message size (#4400) 870a637 is described below commit 870a637b4906862a611e418341dd926e21458f08 Author: Rajan Dhabalia AuthorDate: Tue Jun 2 00:39:55 2020 -0700 PIP 37: [pulsar-client] support large message size (#4400) * PIP 37: [pulsar-client] support large message size fix producer fix ref counts add timeouts add validation fix recycling fix stats review fix test fix test fix send message and expiry-consumer-config fix schema test fix chunk properties * fix test --- .../broker/admin/impl/PersistentTopicsBase.java| 4 + .../broker/service/AbstractBaseDispatcher.java | 3 + .../org/apache/pulsar/broker/service/Consumer.java | 7 +- .../org/apache/pulsar/broker/service/Producer.java | 43 ++- .../pulsar/broker/service/SendMessageInfo.java | 2 + .../apache/pulsar/broker/service/ServerCnx.java| 5 +- .../NonPersistentDispatcherMultipleConsumers.java | 4 +- ...onPersistentDispatcherSingleActiveConsumer.java | 4 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 4 +- .../PersistentDispatcherMultipleConsumers.java | 3 +- .../PersistentDispatcherSingleActiveConsumer.java | 5 +- ...istentStickyKeyDispatcherMultipleConsumers.java | 7 +- .../service/persistent/PersistentSubscription.java | 1 + .../broker/service/persistent/PersistentTopic.java | 12 + ...ntStickyKeyDispatcherMultipleConsumersTest.java | 2 + .../pulsar/client/impl/MessageChunkingTest.java| 379 + .../apache/pulsar/client/api/ConsumerBuilder.java | 45 +++ .../apache/pulsar/client/api/ProducerBuilder.java | 27 ++ .../org/apache/pulsar/client/cli/CmdConsume.java | 21 +- .../org/apache/pulsar/client/cli/CmdProduce.java | 13 +- .../apache/pulsar/client/impl/ConsumerBase.java| 3 + .../pulsar/client/impl/ConsumerBuilderImpl.java| 18 + .../apache/pulsar/client/impl/ConsumerImpl.java| 254 -- .../client/impl/MultiTopicsConsumerImpl.java | 5 +- .../pulsar/client/impl/NegativeAcksTracker.java| 2 + .../PersistentAcknowledgmentsGroupingTracker.java | 73 +++- .../pulsar/client/impl/ProducerBuilderImpl.java| 9 + .../apache/pulsar/client/impl/ProducerImpl.java| 237 - .../pulsar/client/impl/UnAckedMessageTracker.java | 14 + .../impl/conf/ConsumerConfigurationData.java | 7 + .../impl/conf/ProducerConfigurationData.java | 1 + .../impl/AcknowledgementsGroupingTrackerTest.java | 2 + .../apache/pulsar/common/api/proto/PulsarApi.java | 322 + .../pulsar/common/policies/data/ConsumerStats.java | 3 + .../policies/data/NonPersistentPublisherStats.java | 2 +- .../common/policies/data/PublisherStats.java | 3 + .../common/policies/data/SubscriptionStats.java| 3 + .../pulsar/common/policies/data/TopicStats.java| 3 + .../apache/pulsar/common/protocol/Commands.java| 6 + pulsar-common/src/main/proto/PulsarApi.proto | 5 + .../pulsar/testclient/PerformanceConsumer.java | 21 +- .../pulsar/testclient/PerformanceProducer.java | 13 +- 42 files changed, 1450 insertions(+), 147 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a8ab7fd..99f49a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1931,6 +1931,10 @@ public class PersistentTopicsBase extends AdminResource { if (metadata.hasNullValue()) { responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue()); } +if (metadata.getNumChunksFromMsg() > 0) { +responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg())); +responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId())); +} // Decode if needed CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 849d626..7985f30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +
[pulsar.wiki] branch master updated: ats example
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new 25335e9 ats example 25335e9 is described below commit 25335e9948eb67346c3e6f1bb66770d8a6f36325 Author: Rajan Dhabalia AuthorDate: Tue Mar 31 19:56:50 2020 -0700 ats example --- PIP-60:-Support-Proxy-server-with-SNI-routing.md | 75 +++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/PIP-60:-Support-Proxy-server-with-SNI-routing.md b/PIP-60:-Support-Proxy-server-with-SNI-routing.md index d266f10..d1f7f94 100644 --- a/PIP-60:-Support-Proxy-server-with-SNI-routing.md +++ b/PIP-60:-Support-Proxy-server-with-SNI-routing.md @@ -59,4 +59,77 @@ We can also use proxy-server in geo-replication to create a proxy between two br --broker-url-secure pulsar+ssl://my-dmz-broker.com:6651 \ --proxy-url pulsar+ssl://my-dmz-proxy.com:4443 \ --proxy-protocol SNI -``` \ No newline at end of file +``` + +## Example +This section shows SNI-routing using ATS-proxy server. This section shows how to configure ATS proxy-server and pulsar-client to create TCP tunnel between client and broker. + +![image](https://user-images.githubusercontent.com/2898254/78093926-21eabd80-7388-11ea-8982-4a4d644a2b39.png) +[Figure 3: Pulsar SNI Routing with ATS proxy server] + +In this example, Pulsar broker cluster is behind the ATS proxy. Pulsar brokers can’t be accessed by any host except ATS proxy. So, if client wants to connect to pulsar-cluster then client can use ATS-proxy server to create a TCP tunnel with brokers. Pulsar client can use SNI-routing proxy protocol to connect to ATS-proxy and asks ATS-proxy to create TCP tunnel with a target broker. + +This example shows, how can we configure ATS-proxy so, when client passes target broker name into SNI header then ATS-proxy server can find out appropriate broker-url based on configured sni-mapping and forward request to appropriate target broker by creating tcp tunnel with that broker. + +### ATS Configuration +In order to enable SNI routing into ATS proxy, we need to manage 2 configuration files: +1. [ssl_server_name.config](https://docs.trafficserver.apache.org/en/8.0.x/admin-guide/files/ssl_server_name.yaml.en.html) + +This file is used to configure aspects of TLS connection handling for both inbound and outbound connections. The configuration is driven by the SNI values provided by the inbound connection. So, this file manages mapping between hostname which comes into SNI header and actual broker-url where request needs to be forwarded for that host. +ssl_server_name.config + +``` +server_config = { + { + fqdn = 'pulsar-broker-vip', + # Forward to Pulsar broker which is listening on 6651 + tunnel_route = 'pulsar-broker-vip:6651' + }, + { + fqdn = 'pulsar-broker1', + # Forward to Pulsar broker-1 which is listening on 6651 + tunnel_route = 'pulsar-broker1:6651' + }, + { + fqdn = 'pulsar-broker2', + # Forward to Pulsar broker-2 which is listening on 6651 + tunnel_route = 'pulsar-broker2:6651' + }, +} +``` + +2.[records.config](https://docs.trafficserver.apache.org/en/8.0.x/admin-guide/files/records.config.en.html) + +The records.config file, located in /usr/local/etc/trafficserver/) is a list of configurable variables used by the Traffic Server software. One of the requirements of SNI routing in ATS is that it only works over TLS. Therefore, Pulsar brokers and ATS proxy-server should have tls enabled. We will define tls configuration for ATS-proxy server into records.config file. + +``` +CONFIG proxy.config.http.connect_ports STRING 4443 6651 +# ats-proxy cert file +CONFIG proxy.config.ssl.client.cert.path STRING /ats-cert.pem +# ats-proxy key file +CONFIG proxy.config.ssl.client.cert.filename STRING /ats-key.pem +# ssl-port on which ats will listen +CONFIG proxy.config.http.server_ports STRING 4443:ssl 4080 +``` +Once, `ssl_server_name.config` and `records.config` are configured, ATS-proxy server is ready to handle SNI routing and can create TCP tunnel beween client and broker. + +### Pulsar-client Configuration +Now, ATS proxy server is configured and ready to handle SNI routing and create TCP tunnel between client and broker. With this PIP, pulsar-client supports SNI routing by connecting to proxy and sending target broker url into SNI header. Pulsar-client handles SNI routing internally and entire connection handling is abstracted from user. User have to only configure following proxy configuration intially when user creates a pulsar-client to use SNI routing protocol. +``` +String brokerServiceUrl = “pulsar+ssl://pulsar-broker-vip:6651/”; +String proxyUrl = “pulsar+ssl://ats-proxy:443”; +ClientBuilder clientBuilder = PulsarClient.builder
[pulsar.wiki] branch master updated: add pulsar-admin cli example
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new 7cc4cb8 add pulsar-admin cli example 7cc4cb8 is described below commit 7cc4cb81c35d4b9e2a7aa7ad0b4a5233bca911a5 Author: Rajan Dhabalia AuthorDate: Wed Mar 25 12:32:44 2020 -0700 add pulsar-admin cli example --- PIP-60:-Support-Proxy-server-with-SNI-routing.md | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/PIP-60:-Support-Proxy-server-with-SNI-routing.md b/PIP-60:-Support-Proxy-server-with-SNI-routing.md index fc27f0a..d266f10 100644 --- a/PIP-60:-Support-Proxy-server-with-SNI-routing.md +++ b/PIP-60:-Support-Proxy-server-with-SNI-routing.md @@ -2,6 +2,7 @@ * **Status**: Discussion * **Author**: [Rajan Dhabalia](https://github.com/rdhabalia) + * **Pull Request**: [#6566](https://github.com/apache/pulsar/pull/6566) ## Motivation @@ -51,4 +52,11 @@ ProxyProtocol proxyProtocol; //eg: enum: ProxyProtocol.SNI ``` Geo-replication -We can also use proxy-server in geo-replication to create a proxy between two brokers. Therefore, we need similar configurations as pulsar-client into cluster metadata as well. If cluster metadata has a proxy configured then other clusters will connect to this pulsar cluster via proxy. \ No newline at end of file +We can also use proxy-server in geo-replication to create a proxy between two brokers. Therefore, we need similar configurations as pulsar-client into cluster metadata as well. If cluster metadata has a proxy configured then other clusters will connect to this pulsar cluster via proxy. We can configure cluster-metadata with proxy-uri and protocol using admin api/cli. +``` +./pulsar-admin clusters create my-DMZ-cluster \ +--url http://my-dmz-broker.com:8080 \ +--broker-url-secure pulsar+ssl://my-dmz-broker.com:6651 \ +--proxy-url pulsar+ssl://my-dmz-proxy.com:4443 \ +--proxy-protocol SNI +``` \ No newline at end of file
[pulsar.wiki] branch master updated: add PIP-60
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new 38b6539 add PIP-60 38b6539 is described below commit 38b6539edf2d432dde5017b5d13e9287e863a9ce Author: Rajan Dhabalia AuthorDate: Thu Mar 19 16:48:31 2020 -0700 add PIP-60 --- Home.md | 1 + 1 file changed, 1 insertion(+) diff --git a/Home.md b/Home.md index b34b672..672e0ab 100644 --- a/Home.md +++ b/Home.md @@ -7,6 +7,7 @@ We encourage to document any big change or feature or any addition to public use ### Proposed +* [[PIP 60: Support Proxy server with SNI routing]] * [[PIP 59: gPRC Protocol Handler]] * [[PIP 58 : Support Consumers Set Custom Retry Delay]] * [[PIP 57: Improve Broker's Zookeeper Session Timeout Handling]]
[pulsar.wiki] branch master updated: initial draft
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git The following commit(s) were added to refs/heads/master by this push: new 721dd0c initial draft 721dd0c is described below commit 721dd0c76327f9a6d28829c56b08c9f2cd6d5086 Author: Rajan Dhabalia AuthorDate: Thu Mar 19 16:47:35 2020 -0700 initial draft --- PIP-60:-Support-Proxy-server-with-SNI-routing.md | 54 1 file changed, 54 insertions(+) diff --git a/PIP-60:-Support-Proxy-server-with-SNI-routing.md b/PIP-60:-Support-Proxy-server-with-SNI-routing.md new file mode 100644 index 000..fc27f0a --- /dev/null +++ b/PIP-60:-Support-Proxy-server-with-SNI-routing.md @@ -0,0 +1,54 @@ +# PIP-60 Support Proxy-server with SNI routing + + * **Status**: Discussion + * **Author**: [Rajan Dhabalia](https://github.com/rdhabalia) + +## Motivation + +A proxy server is a go‑between or intermediary server that forwards requests from multiple clients to different servers across the Internet. The proxy server can act as a “traffic cop,” in both forward and reverse proxy scenarios, and adds various benefits in your system such as load balancing, performance, security, auto-scaling, etc.. There are already many proxy servers already available in the market which are fast, scalable and more importantly covers various essential security asp [...] + +## SNI Routing + +[TLS Extension Definition](https://tools.ietf.org/html/rfc6066) adds support of Server Name Indication [SNI](https://tools.ietf.org/html/rfc6066#page-6) and it has been available since January 2011. In SNI routing, the proxy server examines TLS connection on an initial handshake and extracts the "SNI" value. +Proxy-server creates a connection with a destination host defined into SNI and forms a TLS tunnel between client and destination host. +Figure 1: shows the layer-4 routing network activity diagram between client and application server via proxy-server (eg: ATS). In the figure, the client initiates TLS connection with the proxy-server by passing hostname of application-server in the SNI header. The proxy server examines SNI header, parses hostname of the target application server and creates an outbound connection with that application server host. Once, proxy server successfully completes TLS handshake with the applicati [...] + + +![image](https://user-images.githubusercontent.com/2898254/76898266-9b47c380-6852-11ea-8969-55783cda5ed0.png) +[Figure 1: Layer 4 routing] + +## Pulsar and Proxy with SNI routing + +Pulsar client creates TCP connection with the broker and exchanges binary data over TCP channel. Pulsar also supports TLS to create a secure connection between a client and the broker. As we have discussed in figure-1, the proxy server performs SNI routing on TLS connection and this mechanism can be perfectly fit into pulsar to introduce proxy between client and broker, and use proxy-server for both forward-proxy and reverse-proxy use cases. +In order to access a specific topic, the pulsar-client first finds out the target broker which owns the topic and then creates a connection channel with that target broker to publish/consume messages on that topic. This section explains how pulsar-client uses proxy-server to find a target broker for a specific topic and create a TCP channel with that target broker to access the topic. +Pulsar client connects to the broker in two phases: lookup and connect. +In lookup phase, a client does a lookup with the broker-service URL / VIP to connect with any broker in the cluster, send lookup request for a topic and receive the response with the name of the target broker that owns the topic. In the second phase, the client uses the lookup response which has the name of the target broker and connects with that target broker to publish/consume messages. + +Figure 2 shows SNI routing for both the phases and how proxy-server creates a TCP tunnel between client and broker. In SNI Routing, pulsar-client always tries to create a connection with proxy-server and pass the target broker/service URL as part of the SNI header. + + +![image](https://user-images.githubusercontent.com/2898254/76898221-879c5d00-6852-11ea-97ed-155e78f7dead.png) +[Figure 2: Pulsar-client and broker connection using SNI Routing] + +**Lookup phase** + +Figure 2 shows that pulsar-client uses broker-service url (pulsar-vip) and proxy-server url (proxy-server:4443) to perform lookup using SNI routing. Pulsar-client uses proxy-server url to create a TLS connection with proxy-server and uses broker-service url to pass it in SNI header. At the time of TLS handshake, Proxy-server does SNI-routing by extracting broker-service name value from SNI header, finds appropriate mapped broke
[pulsar] branch master updated (efe19e0 -> c802609)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git. from efe19e0 [Maven Cleanup] Remove managed-ledger and zk-utils test-jar dependencies when possible (#6513) add c802609 [pulsar-proxy] add rest-api to get connection and topic stats (#6473) No new revisions were added by this update. Summary of changes: .../impl/ManagedLedgerFactoryMBeanImpl.java| 2 +- .../mledger/impl/ManagedLedgerMBeanImpl.java | 2 +- .../org/apache/pulsar/broker/service/Consumer.java | 2 +- .../org/apache/pulsar/broker/service/Producer.java | 2 +- .../nonpersistent/NonPersistentDispatcher.java | 2 +- .../NonPersistentDispatcherMultipleConsumers.java | 2 +- ...onPersistentDispatcherSingleActiveConsumer.java | 2 +- .../nonpersistent/NonPersistentReplicator.java | 2 +- .../persistent/PersistentMessageExpiryMonitor.java | 2 +- .../service/persistent/PersistentReplicator.java | 2 +- .../java/org/apache/pulsar/common/stats}/Rate.java | 2 +- .../pulsar/proxy/server/DirectProxyHandler.java| 17 +- .../pulsar/proxy/server/ParserProxyHandler.java| 48 -- .../pulsar/proxy/server/ProxyConnection.java | 9 +- .../apache/pulsar/proxy/server/ProxyService.java | 35 .../pulsar/proxy/server/ProxyServiceStarter.java | 8 +- .../apache/pulsar/proxy/stats/ConnectionStats.java | 32 ++-- .../org/apache/pulsar/proxy/stats/ProxyStats.java | 97 +++ .../apache/pulsar/proxy/stats}/RestException.java | 25 +-- .../org/apache/pulsar/proxy/stats/TopicStats.java | 50 -- .../pulsar/proxy/server/ProxyIsAHttpProxyTest.java | 16 +- .../apache/pulsar/proxy/server/ProxyStatsTest.java | 191 + 22 files changed, 451 insertions(+), 99 deletions(-) rename {managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util => pulsar-common/src/main/java/org/apache/pulsar/common/stats}/Rate.java (98%) copy pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java => pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ConnectionStats.java (53%) create mode 100644 pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java copy {pulsar-common/src/main/java/org/apache/pulsar/common/util => pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats}/RestException.java (67%) copy pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java => pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java (54%) create mode 100644 pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java
[pulsar] branch master updated: [pulsar-broker] add support to configure max pending publish request per connection (#5742)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new a9d56ea [pulsar-broker] add support to configure max pending publish request per connection (#5742) a9d56ea is described below commit a9d56ea20ba938b31233edb87345e4a9f62b6041 Author: Rajan Dhabalia AuthorDate: Tue Mar 10 10:55:18 2020 -0700 [pulsar-broker] add support to configure max pending publish request per connection (#5742) --- conf/broker.conf | 4 conf/standalone.conf | 4 .../java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 ++ .../main/java/org/apache/pulsar/broker/service/ServerCnx.java | 10 ++ 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 5989880..08f370c 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -114,6 +114,10 @@ brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions # Topics that are inactive for longer than this value will be deleted brokerDeleteInactiveTopicsMaxInactiveDurationSeconds= +# Max pending publish requests per connection to avoid keeping large number of pending +# requests in memory. Default: 1000 +maxPendingPublishdRequestsPerConnection=1000; + # How frequently to proactively check and purge expired messages messageExpiryCheckIntervalInMinutes=5 diff --git a/conf/standalone.conf b/conf/standalone.conf index f351d8d..81bbd43 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -75,6 +75,10 @@ brokerDeleteInactiveTopicsEnabled=true # How often to check for inactive topics brokerDeleteInactiveTopicsFrequencySeconds=60 +# Max pending publish requests per connection to avoid keeping large number of pending +# requests in memory. Default: 1000 +maxPendingPublishdRequestsPerConnection=1000; + # How frequently to proactively check and purge expired messages messageExpiryCheckIntervalInMinutes=5 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 0d554f0..9d2d79d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -276,6 +276,12 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_POLICIES, +doc = "Max pending publish requests per connection to avoid keeping large number of pending " ++ "requests in memory. Default: 1000" +) +private int maxPendingPublishdRequestsPerConnection = 1000; +@FieldContext( +category = CATEGORY_POLICIES, doc = "How frequently to proactively check and purge expired messages" ) private int messageExpiryCheckIntervalInMinutes = 5; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1037952..b91bce8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -142,8 +142,8 @@ public class ServerCnx extends PulsarHandler { // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow // control done by a single producer might not be enough to prevent write spikes on the broker. -private static final int MaxPendingSendRequests = 1000; -private static final int ResumeReadsThreshold = MaxPendingSendRequests / 2; +private final int maxPendingSendRequests; +private final int resumeReadsThreshold; private int pendingSendRequest = 0; private final String replicatorPrefix; private String clientVersion = null; @@ -183,6 +183,8 @@ public class ServerCnx extends PulsarHandler { this.authenticateOriginalAuthData = service.pulsar().getConfiguration().isAuthenticateOriginalAuthData(); this.schemaValidationEnforced = pulsar.getConfiguration().isSchemaValidationEnforced(); this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize(); +this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection(); +this.resumeReadsThreshold = maxPendingSendRequests / 2; } @Override @@ -1759,7 +1761,7 @@ public class ServerCnx extends PulsarHandler { private void startSendOperation(Producer producer, int msgSize) { messagePublishBufferSize += msgSize; boolean isPublishRateExceeded = producer.ge