(pulsar) branch master updated: [fix] [broker] Fix Broker was failing to load stats-internal with broken schema ledger (#22845)

2024-06-05 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 326e9fa731a [fix] [broker] Fix Broker was failing to load 
stats-internal with broken schema ledger (#22845)
326e9fa731a is described below

commit 326e9fa731ae17304621ab915e36d52a9b28a7a0
Author: Rajan Dhabalia 
AuthorDate: Wed Jun 5 11:19:12 2024 -0700

[fix] [broker] Fix Broker was failing to load stats-internal with broken 
schema ledger (#22845)
---
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java   | 7 +++
 1 file changed, 7 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 18e69250c16..2165247b161 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -58,6 +58,8 @@ import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import lombok.Getter;
 import lombok.Value;
+import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
+import 
org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsOnMetadataServerException;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -2829,6 +2831,11 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 }).exceptionally(e -> {
 log.error("[{}] Failed to get ledger 
metadata for the schema ledger {}",
 topic, ledgerId, e);
+if ((e.getCause() instanceof 
BKNoSuchLedgerExistsOnMetadataServerException)
+|| (e.getCause() instanceof 
BKNoSuchLedgerExistsException)) {
+completableFuture.complete(null);
+return null;
+}
 completableFuture.completeExceptionally(e);
 return null;
 });



(pulsar) branch master updated: [fix][broker] Fix broken topic policy implementation compatibility with old pulsar version (#22535)

2024-04-19 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 59daac64c21 [fix][broker] Fix broken topic policy implementation 
compatibility with old pulsar version (#22535)
59daac64c21 is described below

commit 59daac64c210f539e733f883edad09d08333aa62
Author: Rajan Dhabalia 
AuthorDate: Fri Apr 19 10:30:55 2024 -0700

[fix][broker] Fix broken topic policy implementation compatibility with old 
pulsar version (#22535)
---
 .../pulsar/broker/service/AbstractTopic.java   | 52 +-
 ...kerInternalClientConfigurationOverrideTest.java | 42 -
 2 files changed, 72 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index e772486fcc6..44a4ca42cea 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -220,13 +220,16 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener
 
this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue(
 data.getBackLogQuotaMap() == null ? null : 
data.getBackLogQuotaMap().get(type.toString(;
-
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
-
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+
topicPolicies.getTopicMaxMessageSize().updateTopicValue(normalizeValue(data.getMaxMessageSize()));
+
topicPolicies.getMessageTTLInSeconds().updateTopicValue(normalizeValue(data.getMessageTTLInSeconds()));
 
topicPolicies.getPublishRate().updateTopicValue(PublishRate.normalize(data.getPublishRate()));
 
topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
 topicPolicies.getReplicatorDispatchRate().updateTopicValue(
@@ -268,15 +271,19 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters)));
 topicPolicies.getMaxUnackedMessagesOnConsumer()
-
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_consumer);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_consumer));
 topicPolicies.getMaxUnackedMessagesOnSubscription()
-
.updateNamespaceValue(namespacePolicies.max_unacked_messages_per_subscription);
-
topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds);
-
topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic);
-
topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic);
-
topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_unacked_messages_per_subscription));
+topicPolicies.getMessageTTLInSeconds()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.message_ttl_in_seconds));
+topicPolicies.getMaxSubscriptionsPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_subscriptions_per_topic));
+topicPolicies.getMaxProducersPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_producers_per_topic));
+topicPolicies.getMaxConsumerPerTopic()
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_topic));
 topicPolicies.getMaxConsumersPerSubscription()
-
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
+
.updateNamespaceValue(normalizeValue(namespacePolicies.max_consumers_per_subscription));
 
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
 
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
 
topicPolicies.getDeduplicationSnapshotIntervalSeconds().updateNamespaceValue(
@@ -312,6 +319,10 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener producer = pulsarClient.newProducer()
+.topic(topic).create();
+PersistentTopic topicRef = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic).get();
+
assertEquals(topicRef.topicPolicies.getMaxUnackedMessagesOnSubscription().get(),
+conf.getMaxUnackedMessagesPerSubscription

(pulsar) branch master updated (cea1a9ba9b5 -> b42d94121c0)

2024-04-11 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from cea1a9ba9b5 [fix][broker] Fix message drop record in producer stat 
(#22458)
 add b42d94121c0 [improve][broker] Recover susbcription creation on the 
broken schema ledger topic (#22469)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/service/ServerCnx.java|  4 +-
 .../service/schema/BookkeeperSchemaStorage.java|  2 +
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 76 ++
 3 files changed, 81 insertions(+), 1 deletion(-)



(pulsar) branch master updated: [improve][client] PIP-313 Support force unsubscribe using consumer api (#21687)

2023-12-18 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 631b13ad23d [improve][client] PIP-313 Support force unsubscribe using 
consumer api (#21687)
631b13ad23d is described below

commit 631b13ad23d7e48c6e82d38f97c23d129062cb7c
Author: Rajan Dhabalia 
AuthorDate: Mon Dec 18 21:23:18 2023 -0800

[improve][client] PIP-313 Support force unsubscribe using consumer api 
(#21687)

Co-authored-by: Jiwe Guo 
---
 .../org/apache/pulsar/broker/service/Consumer.java |  4 +--
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../apache/pulsar/broker/service/Subscription.java |  2 ++
 .../nonpersistent/NonPersistentSubscription.java   | 17 --
 .../service/persistent/PersistentSubscription.java | 20 ++--
 .../client/impl/BrokerClientIntegrationTest.java   | 37 ++
 .../org/apache/pulsar/client/api/Consumer.java | 25 +++
 .../pulsar/client/api/PulsarClientException.java   |  4 +++
 .../apache/pulsar/client/impl/ConsumerBase.java| 14 ++--
 .../apache/pulsar/client/impl/ConsumerImpl.java|  6 ++--
 .../client/impl/MultiTopicsConsumerImpl.java   |  4 +--
 .../apache/pulsar/common/protocol/Commands.java|  5 +--
 pulsar-common/src/main/proto/PulsarApi.proto   |  1 +
 13 files changed, 125 insertions(+), 16 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 5ec76d07feb..83dcd8d6c16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -421,8 +421,8 @@ public class Consumer {
 }
 }
 
-public void doUnsubscribe(final long requestId) {
-subscription.doUnsubscribe(this).thenAccept(v -> {
+public void doUnsubscribe(final long requestId, boolean force) {
+subscription.doUnsubscribe(this, force).thenAccept(v -> {
 log.info("Unsubscribed successfully from {}", subscription);
 cnx.removedConsumer(this);
 cnx.getCommandSender().sendSuccessResponse(requestId);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2baa55b80e7..9f2b98aeb40 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1958,7 +1958,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 CompletableFuture consumerFuture = 
consumers.get(unsubscribe.getConsumerId());
 
 if (consumerFuture != null && consumerFuture.isDone() && 
!consumerFuture.isCompletedExceptionally()) {
-
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId());
+
consumerFuture.getNow(null).doUnsubscribe(unsubscribe.getRequestId(), 
unsubscribe.isForce());
 } else {
 commandSender.sendErrorResponse(unsubscribe.getRequestId(), 
ServerError.MetadataError,
 "Consumer not found");
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 6805d197521..61107b7b0db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -75,6 +75,8 @@ public interface Subscription extends MessageExpirer {
 
 CompletableFuture doUnsubscribe(Consumer consumer);
 
+CompletableFuture doUnsubscribe(Consumer consumer, boolean 
forcefully);
+
 CompletableFuture clearBacklog();
 
 CompletableFuture skipMessages(int numMessagesToSkip);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 28ea9f39ac8..92aba6221da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -429,11 +429,24 @@ public class NonPersistentSubscription extends 
AbstractSubscription implements S
  */
 @Override
 public CompletableFuture doUnsubscribe(Consumer consumer) {
+return doUnsubscribe(consumer, false);
+}
+
+/**
+ * Handle unsubscribe command from the client API Check with the 
dispatcher is this con

(pulsar-client-cpp) branch main updated: [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373)

2023-12-18 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
 new 25ea451  [PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP 
client (#373)
25ea451 is described below

commit 25ea451eb3a79d48966689ec64460ae03d5d57da
Author: Rajan Dhabalia 
AuthorDate: Mon Dec 18 19:36:12 2023 -0800

[PIP-60] [Proxy-Client] Support SNI routing for Pulsar CPP client (#373)

* [PIP-60] [Proxy-Server] Support SNI routing for Pulsar CPP client

* fix format

* fix const def

* Fix format check

-

Co-authored-by: Yunze Xu 
---
 include/pulsar/ClientConfiguration.h | 31 +++
 lib/ClientConfiguration.cc   | 16 
 lib/ClientConfigurationImpl.h|  2 ++
 lib/ClientConnection.cc  | 20 
 lib/ClientConnection.h   |  5 +
 tests/ConsumerTest.cc| 12 
 6 files changed, 82 insertions(+), 4 deletions(-)

diff --git a/include/pulsar/ClientConfiguration.h 
b/include/pulsar/ClientConfiguration.h
index 3d651e9..cc3c3ed 100644
--- a/include/pulsar/ClientConfiguration.h
+++ b/include/pulsar/ClientConfiguration.h
@@ -32,6 +32,10 @@ class PULSAR_PUBLIC ClientConfiguration {
 ~ClientConfiguration();
 ClientConfiguration(const ClientConfiguration&);
 ClientConfiguration& operator=(const ClientConfiguration&);
+enum ProxyProtocol
+{
+SNI = 0
+};
 
 /**
  * Configure a limit on the amount of memory that will be allocated by 
this client instance.
@@ -320,6 +324,33 @@ class PULSAR_PUBLIC ClientConfiguration {
  */
 ClientConfiguration& setConnectionTimeout(int timeoutMs);
 
+/**
+ * Set proxy-service url when client would like to connect to broker via 
proxy. Client must configure both
+ * proxyServiceUrl and appropriate proxyProtocol.
+ *
+ * Example: pulsar+ssl://ats-proxy.example.com:4443
+ *
+ * @param proxyServiceUrl proxy url to connect with broker
+ * @return
+ */
+ClientConfiguration& setProxyServiceUrl(const std::string& 
proxyServiceUrl);
+
+const std::string& getProxyServiceUrl() const;
+
+/**
+ * Set appropriate proxy-protocol along with proxy-service url. Currently 
Pulsar supports SNI proxy
+ * routing.
+ *
+ * SNI routing:
+ * 
https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
+ *
+ * @param proxyProtocol possible options (SNI)
+ * @return
+ */
+ClientConfiguration& setProxyProtocol(ProxyProtocol proxyProtocol);
+
+ProxyProtocol getProxyProtocol() const;
+
 /**
  * The getter associated with setConnectionTimeout().
  */
diff --git a/lib/ClientConfiguration.cc b/lib/ClientConfiguration.cc
index 63c0bf8..6e7c745 100644
--- a/lib/ClientConfiguration.cc
+++ b/lib/ClientConfiguration.cc
@@ -134,6 +134,22 @@ ClientConfiguration& 
ClientConfiguration::setConcurrentLookupRequest(int concurr
 return *this;
 }
 
+ClientConfiguration& ClientConfiguration::setProxyServiceUrl(const 
std::string& proxyServiceUrl) {
+impl_->proxyServiceUrl = proxyServiceUrl;
+return *this;
+}
+
+const std::string& ClientConfiguration::getProxyServiceUrl() const { return 
impl_->proxyServiceUrl; }
+
+ClientConfiguration& 
ClientConfiguration::setProxyProtocol(ClientConfiguration::ProxyProtocol 
proxyProtocol) {
+impl_->proxyProtocol = proxyProtocol;
+return *this;
+}
+
+ClientConfiguration::ProxyProtocol ClientConfiguration::getProxyProtocol() 
const {
+return impl_->proxyProtocol;
+}
+
 int ClientConfiguration::getConcurrentLookupRequest() const { return 
impl_->concurrentLookupRequest; }
 
 ClientConfiguration& ClientConfiguration::setMaxLookupRedirects(int 
maxLookupRedirects) {
diff --git a/lib/ClientConfigurationImpl.h b/lib/ClientConfigurationImpl.h
index 3458a05..b62b97c 100644
--- a/lib/ClientConfigurationImpl.h
+++ b/lib/ClientConfigurationImpl.h
@@ -46,6 +46,8 @@ struct ClientConfigurationImpl {
 std::string listenerName;
 int connectionTimeoutMs{1};  // 10 seconds
 std::string description;
+std::string proxyServiceUrl;
+ClientConfiguration::ProxyProtocol proxyProtocol;
 
 std::unique_ptr takeLogger() { return 
std::move(loggerFactory); }
 };
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 97d8847..61aa7f7 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -209,7 +209,15 @@ ClientConnection::ClientConnection(const std::string& 
logicalAddress, const std:
 boost::asio::ssl::context ctx(executor_->getIOService(), 
boost::asio::ssl::context::tlsv1_client);
 #endif
 Url serv

(pulsar) branch master updated: [fix][broker] Fix failure while creating non-durable cursor with inactive managed-ledger (#21508)

2023-11-04 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 720d6d24327 [fix][broker] Fix failure while creating non-durable 
cursor with inactive managed-ledger (#21508)
720d6d24327 is described below

commit 720d6d24327a98d974bcf90eb248037572ab39e3
Author: Rajan Dhabalia 
AuthorDate: Sat Nov 4 09:40:31 2023 -0700

[fix][broker] Fix failure while creating non-durable cursor with inactive 
managed-ledger (#21508)
---
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  4 +++-
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  5 -
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 24 ++
 .../offload/jcloud/impl/MockManagedLedger.java |  4 ++--
 4 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index c7dd8ea9129..f91d9ec3f5a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -682,8 +682,10 @@ public interface ManagedLedger {
 /**
  * Check current inactive ledger (based on {@link 
ManagedLedgerConfig#getInactiveLedgerRollOverTimeMs()} and
  * roll over that ledger if inactive.
+ *
+ * @return true if ledger is considered for rolling over
  */
-void checkInactiveLedgerAndRollOver();
+boolean checkInactiveLedgerAndRollOver();
 
 /**
  * Check if managed ledger should cache backlog reads.
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e349bf50808..f3e3a9dc144 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -4455,7 +4455,7 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
 }
 
 @Override
-public void checkInactiveLedgerAndRollOver() {
+public boolean checkInactiveLedgerAndRollOver() {
 long currentTimeMs = System.currentTimeMillis();
 if (inactiveLedgerRollOverTimeMs > 0 && currentTimeMs > 
(lastAddEntryTimeMs + inactiveLedgerRollOverTimeMs)) {
 log.info("[{}] Closing inactive ledger, last-add entry {}", name, 
lastAddEntryTimeMs);
@@ -4476,10 +4476,13 @@ public class ManagedLedgerImpl implements 
ManagedLedger, CreateCallback {
 }
 
 ledgerClosed(lh);
+createLedgerAfterClosed();
 // we do not create ledger here, since topic is inactive 
for a long time.
 }, null);
+return true;
 }
 }
+return false;
 }
 
 
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 5bd6c299d99..6e04aafec0f 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -124,6 +124,7 @@ import 
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
@@ -4076,4 +4077,27 @@ public class ManagedLedgerTest extends 
MockedBookKeeperTestCase {
 });
 future.join();
 }
+
+@Test
+public void testNonDurableCursorCreateForInactiveLedger() throws Exception 
{
+String mlName = "testLedgerInfoMetaCorrectIfAddEntryTimeOut";
+BookKeeper spyBookKeeper = spy(bkc);
+ManagedLedgerFactoryImpl factory = new 
ManagedLedgerFactoryImpl(metadataStore, spyBookKeeper);
+ManagedLedgerConfig config = new ManagedLedgerConfig();
+config.setInactiveLedgerRollOverTime(10, TimeUnit.MILLISECONDS);
+ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, 
config);
+
+MutableBoolean isRolledOver = new MutableBoolean(false);
+retryStrategically((test) -> {
+if (isRolledOver.booleanValue()) {
+return true;
+   

(pulsar-adapters) branch master updated: Add Strom adapter back after removing from Apache Storm repo (#55)

2023-11-01 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
 new 5aeddbb  Add Strom adapter back after removing from Apache Storm repo 
(#55)
5aeddbb is described below

commit 5aeddbb21508fa70d36199f95dabebe17774384c
Author: Rajan Dhabalia 
AuthorDate: Wed Nov 1 10:24:57 2023 -0700

Add Strom adapter back after removing from Apache Storm repo (#55)

This reverts commit ab537c13061b06de2044cd43965de930d762fe8a.
---
 .asf.yaml  |   3 +-
 README.md  |   4 +-
 pom.xml|  19 +
 pulsar-storm/pom.xml   | 102 +
 .../apache/pulsar/storm/MessageToValuesMapper.java |  44 ++
 .../java/org/apache/pulsar/storm/PulsarBolt.java   | 207 +
 .../pulsar/storm/PulsarBoltConfiguration.java  |  57 +++
 .../java/org/apache/pulsar/storm/PulsarSpout.java  | 494 +
 .../pulsar/storm/PulsarSpoutConfiguration.java | 195 
 .../apache/pulsar/storm/PulsarSpoutConsumer.java   |  58 +++
 .../pulsar/storm/PulsarStormConfiguration.java |  90 
 .../java/org/apache/pulsar/storm/PulsarTuple.java  |  45 ++
 .../apache/pulsar/storm/SharedPulsarClient.java| 155 +++
 .../apache/pulsar/storm/TupleToMessageMapper.java  |  66 +++
 pulsar-storm/src/main/javadoc/overview.html|  29 ++
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 178 
 tests/pom.xml  |   1 +
 tests/pulsar-storm-test/pom.xml| 131 ++
 .../apache/pulsar/storm/MockOutputCollector.java   | 101 +
 .../pulsar/storm/MockSpoutOutputCollector.java |  80 
 .../org/apache/pulsar/storm/PulsarBoltTest.java| 236 ++
 .../org/apache/pulsar/storm/PulsarSpoutTest.java   | 349 +++
 .../java/org/apache/pulsar/storm/TestUtil.java |  35 ++
 .../apache/pulsar/storm/example/StormExample.java  | 166 +++
 24 files changed, 2841 insertions(+), 4 deletions(-)

diff --git a/.asf.yaml b/.asf.yaml
index c04e151..74589a2 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -27,6 +27,7 @@ github:
 - streaming
 - queuing
 - event-streaming
+- apache-storm
 - apache-spark
 - apache-kafka
   features:
@@ -47,4 +48,4 @@ github:
 notifications:
   commits:  commits@pulsar.apache.org
   issues:   commits@pulsar.apache.org
-  pullrequests: commits@pulsar.apache.org
+  pullrequests: commits@pulsar.apache.org
\ No newline at end of file
diff --git a/README.md b/README.md
index 33409df..ce62338 100644
--- a/README.md
+++ b/README.md
@@ -25,8 +25,6 @@ This repository is used for hosting all the adapters 
maintained and supported by
 
 [Apache Flink adapter](https://github.com/apache/flink-connector-pulsar) is 
supported and maintained by Apache Flink Community.
 
-[Apache Storm bolt and 
spout](https://github.com/apache/storm/tree/master/external/storm-pulsar) are 
supported by Apache Storm Community.
-
 ## Building
 
 In order to build this code you can simply use Maven
@@ -44,5 +42,5 @@ git checkout v2.11.0
 mvn clean install -DskipTests
 ```
 
-This is because this repository depends on test integration artifacts of the 
relative version on the main
+This is because this repository depends on test integration artifacts of the 
relative version on the main 
 Apache Pulsar codebase
diff --git a/pom.xml b/pom.xml
index 5f39f48..4f240d1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
   
 2.11.0
 2.7.2
+2.0.0
 0.8.1.1
 1.10.2
 1.2.17
@@ -139,6 +140,7 @@
   
 
   
+pulsar-storm
 pulsar-spark
 pulsar-client-kafka-compat
 pulsar-log4j2-appender
@@ -250,6 +252,22 @@
 
   
 
+  
+org.apache.storm
+storm-client
+${storm.version}
+  
+  
+org.apache.storm
+storm-server
+${storm.version}
+  
+  
+org.apache.storm
+storm-core
+${storm.version}
+  
+
   
 org.apache.kafka
 kafka_2.9.2
@@ -1068,3 +1086,4 @@
   
 
 
+
diff --git a/pulsar-storm/pom.xml b/pulsar-storm/pom.xml
new file mode 100644
index 000..a20649b
--- /dev/null
+++ b/pulsar-storm/pom.xml
@@ -0,0 +1,102 @@
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+org.apache.pulsar
+pulsar-adapters
+2.11.0-SNAPSHOT
+..
+  
+
+  pulsar-storm
+  Pulsar Storm adapter
+
+  
+
+
+  org.apache.pulsar
+  pulsar-common
+  test
+
+
+
+  org.apache.pulsar
+  pulsar-client
+
+
+
+  org.slf4j
+  slf4j-api
+
+
+
+  org.mockito
+

[pulsar] branch master updated: [fix][broker]Create v1/namespace with Policies (#21171)

2023-10-25 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 2b5c199053a [fix][broker]Create v1/namespace with Policies (#21171)
2b5c199053a is described below

commit 2b5c199053a5b2d7f849e6604d619bae9197a8c9
Author: vraulji567 <95091480+vraulji...@users.noreply.github.com>
AuthorDate: Wed Oct 25 22:16:00 2023 -0400

[fix][broker]Create v1/namespace with Policies (#21171)

Co-authored-by: vraulji 
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 11 
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 36 
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 15 -
 .../apache/pulsar/broker/admin/NamespacesTest.java | 64 ++
 .../client/admin/internal/NamespacesImpl.java  |  5 +-
 5 files changed, 112 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bd3690d3c74..7be1ebf3dd1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2656,4 +2656,15 @@ public abstract class NamespacesBase extends 
AdminResource {
 throw new RestException(e);
 }
 }
+
+protected Policies getDefaultPolicesIfNull(Policies policies) {
+if (policies == null) {
+policies = new Policies();
+}
+int defaultNumberOfBundles = 
config().getDefaultNumberOfNamespaceBundles();
+if (policies.bundles == null) {
+policies.bundles = getBundles(defaultNumberOfBundles);
+}
+return policies;
+}
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index b188750252a..9cf394e77f4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1722,5 +1722,41 @@ public class Namespaces extends NamespacesBase {
 internalEnableMigration(migrated);
 }
 
+@PUT
+@Path("/{property}/{cluster}/{namespace}/policy")
+@ApiOperation(value = "Creates a new namespace with the specified 
policies")
+@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+@ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist"),
+@ApiResponse(code = 409, message = "Namespace already exists"),
+@ApiResponse(code = 412, message = "Namespace name is not valid") 
})
+public void createNamespace(@Suspended AsyncResponse response,
+@PathParam("property") String property,
+@PathParam("cluster") String cluster,
+@PathParam("namespace") String namespace,
+@ApiParam(value = "Policies for the 
namespace") Policies policies) {
+validateNamespaceName(property, cluster, namespace);
+CompletableFuture ret;
+if (!namespaceName.isGlobal()) {
+// If the namespace is non global, make sure property has the 
access on the cluster. For global namespace,
+// same check is made at the time of setting replication.
+ret = validateClusterForTenantAsync(namespaceName.getTenant(), 
namespaceName.getCluster());
+} else {
+ret = CompletableFuture.completedFuture(null);
+}
+
+ret.thenApply(__ -> 
getDefaultPolicesIfNull(policies)).thenCompose(this::internalCreateNamespace)
+.thenAccept(__ -> 
response.resume(Response.noContent().build()))
+.exceptionally(ex -> {
+Throwable root = FutureUtil.unwrapCompletionException(ex);
+if (root instanceof 
MetadataStoreException.AlreadyExistsException) {
+response.resume(new RestException(Status.CONFLICT, 
"Namespace already exists"));
+} else {
+log.error("[{}] Failed to create namespace {}", 
clientAppId(), namespaceName, ex);
+resumeAsyncResponseExceptionally(response, ex);
+}
+return null;
+});
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/p

[pulsar] branch master updated: [feat] [broker] PIP-188 Fix cluster migration state store into local namespace policies (#21363)

2023-10-25 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3a9f99f03db [feat] [broker] PIP-188 Fix cluster migration state store 
into local namespace policies (#21363)
3a9f99f03db is described below

commit 3a9f99f03db175ab0622dbfc87cde9efab24276e
Author: Rajan Dhabalia 
AuthorDate: Wed Oct 25 14:26:20 2023 -0700

[feat] [broker] PIP-188 Fix cluster migration state store into local 
namespace policies (#21363)

Co-authored-by: Rajan Dhabalia 
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 50 +++---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 +-
 .../pulsar/broker/service/AbstractTopic.java   |  6 +--
 .../broker/service/ClusterMigrationTest.java   |  1 +
 .../pulsar/common/policies/data/Policies.java  |  5 ++-
 .../pulsar/common/policies/data/LocalPolicies.java |  1 +
 6 files changed, 36 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index e9beab90b5f..1526ae18a90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -57,6 +57,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.EntryFilters;
+import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -304,7 +305,9 @@ public abstract class AdminResource extends 
PulsarWebResource {
 // fetch bundles from LocalZK-policies
 BundlesData bundleData = 
pulsar().getNamespaceService().getNamespaceBundleFactory()
 .getBundles(namespaceName).getBundlesData();
+Optional localPolicies = 
getLocalPolicies().getLocalPolicies(namespaceName);
 policies.bundles = bundleData != null ? bundleData : 
policies.bundles;
+policies.migrated = localPolicies.isPresent() ? 
localPolicies.get().migrated : false;
 if (policies.is_allow_auto_update_schema == null) {
 // the type changed from boolean to Boolean. return broker 
value here for keeping compatibility.
 policies.is_allow_auto_update_schema = 
pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
@@ -321,32 +324,31 @@ public abstract class AdminResource extends 
PulsarWebResource {
 }
 
 protected CompletableFuture 
getNamespacePoliciesAsync(NamespaceName namespaceName) {
-return 
namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
-if (policies.isPresent()) {
-return pulsar()
-.getNamespaceService()
-.getNamespaceBundleFactory()
-.getBundlesAsync(namespaceName)
-.thenCompose(bundles -> {
-BundlesData bundleData = null;
-try {
-bundleData = bundles.getBundlesData();
-} catch (Exception e) {
-log.error("[{}] Failed to get namespace policies {}", 
clientAppId(), namespaceName, e);
-return FutureUtil.failedFuture(new RestException(e));
-}
-policies.get().bundles = bundleData != null ? bundleData : 
policies.get().bundles;
-if (policies.get().is_allow_auto_update_schema == null) {
-// the type changed from boolean to Boolean. return 
broker value here for keeping compatibility.
-policies.get().is_allow_auto_update_schema = 
pulsar().getConfig()
-.isAllowAutoUpdateSchemaEnabled();
+CompletableFuture result = new CompletableFuture<>();
+namespaceResources().getPoliciesAsync(namespaceName)
+
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, 
localPolicies) -> {
+if (pl.isPresent()) {
+Policies policies = pl.get();
+if (localPolicies.isPresent()) {
+policies.bundles = localPolicies.get().bundles;
+policies.migrated = localPolicies.get().migrated;
+}
+if (policies.is_allow_a

[pulsar] branch master updated: [feat] [broker] PIP-188 Fix cluster migration state store into local metadatastore (#21359)

2023-10-25 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 04d1225fbb1 [feat] [broker] PIP-188 Fix cluster migration state store 
into local metadatastore  (#21359)
04d1225fbb1 is described below

commit 04d1225fbb1485333f44138e587aadce34ea1f0e
Author: Rajan Dhabalia 
AuthorDate: Tue Oct 24 23:59:09 2023 -0700

[feat] [broker] PIP-188 Fix cluster migration state store into local 
metadatastore  (#21359)

Co-authored-by: Rajan Dhabalia 
---
 .../pulsar/broker/resources/ClusterResources.java  |  40 ++-
 .../pulsar/broker/resources/PulsarResources.java   |   3 +-
 .../pulsar/broker/admin/impl/ClustersBase.java |  44 ++-
 .../pulsar/broker/service/AbstractTopic.java   |  31 +++--
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   2 +-
 .../apache/pulsar/broker/service/ServerCnx.java|   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../broker/service/ClusterMigrationTest.java   | 128 +
 .../org/apache/pulsar/client/admin/Clusters.java   |  43 ++-
 .../pulsar/common/policies/data/ClusterData.java   |  26 -
 .../common/policies/data/ClusterPolicies.java  |  61 ++
 .../pulsar/client/admin/internal/ClustersImpl.java |  16 ++-
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  13 ++-
 .../common/policies/data/ClusterDataImpl.java  |  31 +
 .../common/policies/data/ClusterPoliciesImpl.java  |  85 ++
 .../common/policies/data/ClusterDataImplTest.java  |   3 -
 18 files changed, 380 insertions(+), 154 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
index 843cec7b205..b0cc50edf1f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import lombok.Getter;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl;
 import org.apache.pulsar.common.policies.data.FailureDomainImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -39,10 +40,19 @@ public class ClusterResources extends 
BaseResources {
 
 @Getter
 private FailureDomainResources failureDomainResources;
-
-public ClusterResources(MetadataStore store, int operationTimeoutSec) {
-super(store, ClusterData.class, operationTimeoutSec);
-this.failureDomainResources = new FailureDomainResources(store, 
FailureDomainImpl.class, operationTimeoutSec);
+@Getter
+private ClusterPoliciesResources clusterPoliciesResources;
+
+public ClusterResources(MetadataStore localStore, MetadataStore 
configurationStore, int operationTimeoutSec) {
+super(configurationStore, ClusterData.class, operationTimeoutSec);
+this.failureDomainResources = new 
FailureDomainResources(configurationStore, FailureDomainImpl.class,
+operationTimeoutSec);
+if (localStore != null) {
+this.clusterPoliciesResources = new 
ClusterPoliciesResources(localStore, ClusterPoliciesImpl.class,
+operationTimeoutSec);
+} else {
+this.clusterPoliciesResources = null;
+}
 }
 
 public CompletableFuture> listAsync() {
@@ -216,4 +226,26 @@ public class ClusterResources extends 
BaseResources {
 });
 }
 }
+
+public static class ClusterPoliciesResources extends 
BaseResources {
+public static final String LOCAL_POLICIES_PATH = "policies";
+
+public ClusterPoliciesResources(MetadataStore store, 
Class clazz,
+int operationTimeoutSec) {
+super(store, clazz, operationTimeoutSec);
+}
+
+public Optional getClusterPolicies(String 
clusterName) throws MetadataStoreException {
+return get(joinPath(BASE_CLUSTERS_PATH, clusterName, 
LOCAL_POLICIES_PATH));
+}
+
+public CompletableFuture> 
getClusterPoliciesAsync(String clusterName) {
+return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, 
LOCAL_POLICIES_PATH));
+}
+
+public CompletableFuture setPoliciesWithCreateAsync(String 
clusterName,
+Function, ClusterPoliciesImpl> 
createFunction) {
+return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, 
clusterName, 

[pulsar] branch master updated: [fix][broker] Allow broker deployment in heterogeneous hw config cluster without restricting nic speed detection (#21409)

2023-10-24 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 789d284d6a0 [fix][broker] Allow broker deployment in heterogeneous hw 
config cluster without restricting nic speed detection (#21409)
789d284d6a0 is described below

commit 789d284d6a073e61abac27da875f38df27c5217d
Author: Rajan Dhabalia 
AuthorDate: Tue Oct 24 16:14:44 2023 -0700

[fix][broker] Allow broker deployment in heterogeneous hw config cluster 
without restricting nic speed detection (#21409)
---
 .../src/main/java/org/apache/pulsar/broker/PulsarService.java| 9 -
 .../loadbalance/extensions/ExtensibleLoadManagerImplTest.java| 8 
 .../tests/integration/loadbalance/ExtensibleLoadManagerTest.java | 5 -
 3 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e2950594047..9a202c28c2a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -83,7 +83,6 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor;
 import org.apache.pulsar.broker.intercept.BrokerInterceptors;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
-import org.apache.pulsar.broker.loadbalance.LinuxInfoUtils;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask;
 import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask;
@@ -733,14 +732,6 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 config.getDefaultRetentionTimeInMinutes() * 60));
 }
 
-if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty()
-&& config.isLoadBalancerEnabled()
-&& LinuxInfoUtils.isLinux()
-&& !LinuxInfoUtils.checkHasNicSpeeds()) {
-throw new IllegalStateException("Unable to read VM NIC speed. 
You must set "
-+ "[loadBalancerOverrideBrokerNicSpeedGbps] to 
override it when load balancer is enabled.");
-}
-
 localMetadataSynchronizer = 
StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
 ? new PulsarMetadataEventSynchronizer(this, 
config.getMetadataSyncEventTopic())
 : null;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 20ba9500cb1..c7cd55e183b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -256,6 +256,14 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
 TopicName topicName = TopicName.get(defaultTestNamespace + 
"/test-check-ownership");
 NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
 // 1. The bundle is never assigned.
+retryStrategically((test) -> {
+try {
+return 
!primaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get()
+&& 
!secondaryLoadManager.checkOwnershipAsync(Optional.empty(), bundle).get();
+} catch (Exception e) {
+return false;
+}
+}, 5, 200);
 assertFalse(primaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
 assertFalse(secondaryLoadManager.checkOwnershipAsync(Optional.empty(), 
bundle).get());
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
index 035d88c7be0..158827ca34d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.loadbalance;
 
 import static 
org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT;
+import static 
org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
 import 

[pulsar] branch master updated: [fix][broker] Fix race condition of replication cluster connection during migration topic (#21364)

2023-10-16 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 689976bf3ea [fix][broker] Fix race condition of replication cluster 
connection during migration topic (#21364)
689976bf3ea is described below

commit 689976bf3ea8dc4cff6f4fdcab54bdd4a8a73a7e
Author: Rajan Dhabalia 
AuthorDate: Mon Oct 16 14:05:13 2023 -0700

[fix][broker] Fix race condition of replication cluster connection during 
migration topic (#21364)

Co-authored-by: Rajan Dhabalia 
---
 .../apache/pulsar/broker/service/ServerCnx.java|  2 +-
 .../org/apache/pulsar/broker/service/Topic.java|  2 ++
 .../service/nonpersistent/NonPersistentTopic.java  |  5 +++
 .../broker/service/persistent/PersistentTopic.java | 37 --
 .../broker/service/ClusterMigrationTest.java   |  9 --
 5 files changed, 49 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index efbe3fcd7ec..95f139dc11e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1680,7 +1680,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 if (ex.getCause() instanceof 
BrokerServiceException.TopicMigratedException) {
 Optional clusterURL = 
getMigratedClusterUrl(service.getPulsar(), topic.getName());
 if (clusterURL.isPresent()) {
-if (topic.isReplicationBacklogExist()) {
+if (!topic.shouldProducerMigrate()) {
 log.info("Topic {} is migrated but replication backlog 
exist: "
 + "producerId = {}, producerName = {}, 
{}", topicName,
 producerId, producerName, 
ex.getCause().getMessage());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 7657d77e129..c697639ff4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -234,6 +234,8 @@ public interface Topic {
 
 boolean isBrokerPublishRateExceeded();
 
+boolean shouldProducerMigrate();
+
 boolean isReplicationBacklogExist();
 
 void disableCnxAutoRead();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 54811da7238..76e9f261ca6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -235,6 +235,11 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 // No-op
 }
 
+@Override
+public boolean shouldProducerMigrate() {
+return true;
+}
+
 @Override
 public boolean isReplicationBacklogExist() {
 return false;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 1ed7bafc147..03ee0f06e2f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -660,8 +660,9 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 List> futures = new ArrayList<>();
 // send migration url metadata to producers before 
disconnecting them
 if (isMigrated()) {
-if (isReplicationBacklogExist()) {
-log.info("Topic {} is migrated but replication backlog 
exists. Closing producers.", topic);
+if (!shouldProducerMigrate()) {
+log.info("Topic {} is migrated but replication-backlog 
exists or "
++ "subs not created. Closing producers.", 
topic);
 } else {
 producers.forEach((__, producer) -> 
producer.topicMigrated(getMigratedClusterUrl()));
 }
@@ -2592,6 +2593,20 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 if (!clusterUrl.isPresent()

[pulsar] branch master updated: [fix][broker] Fix avoid creating new topic after migration is started (#21368)

2023-10-16 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e08841114ce [fix][broker] Fix avoid creating new topic after migration 
is started (#21368)
e08841114ce is described below

commit e08841114cece7e4dc64677505ab66271a2bace5
Author: Rajan Dhabalia 
AuthorDate: Mon Oct 16 11:26:29 2023 -0700

[fix][broker] Fix avoid creating new topic after migration is started 
(#21368)

Co-authored-by: Rajan Dhabalia 
---
 .../pulsar/broker/service/AbstractTopic.java   |  5 +++
 .../pulsar/broker/service/BrokerService.java   | 26 ++-
 .../apache/pulsar/broker/service/ServerCnx.java| 51 +++---
 .../broker/service/ClusterMigrationTest.java   | 43 +++---
 4 files changed, 114 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 3cb396d7a4b..a8f25f61a94 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -1352,6 +1352,11 @@ public abstract class AbstractTopic implements Topic, 
TopicPolicyListener 
isClusterMigrationEnabled(PulsarService pulsar,
+String topic) {
+return getMigratedClusterUrlAsync(pulsar, topic).thenApply(url -> 
url.isPresent());
+}
+
 public static CompletableFuture> 
getMigratedClusterUrlAsync(PulsarService pulsar,

  String topic) {
 return 
pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d03a94a0563..b85f77cb2f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -111,6 +111,7 @@ import 
org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
 import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
 import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
@@ -1521,6 +1522,12 @@ public class BrokerService implements Closeable {
 topicFuture.handle((persistentTopic, ex) -> {
 // release permit and process pending topic
 topicLoadSemaphore.release();
+// do not recreate topic if topic is already 
migrated and deleted by broker
+// so, avoid creating a new topic if migration is 
already started
+if (ex != null && (ex.getCause() instanceof 
TopicMigratedException)) {
+
topicFuture.completeExceptionally(ex.getCause());
+return null;
+}
 createPendingLoadTopic();
 return null;
 });
@@ -1632,7 +1639,10 @@ public class BrokerService implements Closeable {
 ? checkMaxTopicsPerNamespace(topicName, 1)
 : CompletableFuture.completedFuture(null);
 
-maxTopicsCheck.thenCompose(__ -> 
getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> {
+CompletableFuture isTopicAlreadyMigrated = 
checkTopicAlreadyMigrated(topicName);
+
+maxTopicsCheck.thenCompose(__ -> 
isTopicAlreadyMigrated).thenCompose(__ -> getManagedLedgerConfig(topicName))
+.thenAccept(managedLedgerConfig -> {
 if (isBrokerEntryMetadataEnabled() || 
isBrokerPayloadProcessorEnabled()) {
 // init managedLedger interceptor
 Set interceptors = new 
HashSet<>();
@@ -1760,6 +1770,20 @@ public class BrokerService implements Closeable {
 });
 }
 
+private CompletableFuture checkTopicAlreadyMigrated(TopicName 
topicName) {
+CompletableFuture result = new CompletableFuture<>();
+AbstractTopic.isClusterMigrationEnabled(pulsar, 

[pulsar] branch master updated: [fix][broker]Support to migrate topics from blue to green cluster per namespace (#21367)

2023-10-15 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a10564d875f [fix][broker]Support to migrate topics from  blue to green 
cluster per namespace (#21367)
a10564d875f is described below

commit a10564d875f42c32fda0178cfe7ad976d26fbfb1
Author: vraulji567 <95091480+vraulji...@users.noreply.github.com>
AuthorDate: Sun Oct 15 20:32:42 2023 -0400

[fix][broker]Support to migrate topics from  blue to green cluster per 
namespace (#21367)

Co-authored-by: Vishwadeepsinh Raulji 
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  14 +
 .../apache/pulsar/broker/admin/v1/Namespaces.java  |  13 +
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  12 +-
 .../pulsar/broker/service/AbstractTopic.java   |  22 +-
 .../org/apache/pulsar/broker/service/Consumer.java |   3 +-
 .../apache/pulsar/broker/service/ServerCnx.java|   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |  11 +-
 .../broker/service/persistent/PersistentTopic.java |   5 +-
 .../broker/service/ClusterMigrationTest.java   | 455 +
 .../org/apache/pulsar/client/admin/Namespaces.java |  27 +-
 .../pulsar/common/policies/data/Policies.java  |   2 +
 .../client/admin/internal/NamespacesImpl.java  |  11 +
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  11 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  16 +
 14 files changed, 579 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 8ab1f4dc860..a8f1af1d34f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2642,4 +2642,18 @@ public abstract class NamespacesBase extends 
AdminResource {
 return null;
 });
 }
+
+protected void internalEnableMigration(boolean migrated) {
+validateSuperUserAccess();
+try {
+updatePolicies(namespaceName, policies -> {
+policies.isMigrated = migrated;
+return policies;
+});
+log.info("Successfully updated migration on namespace {}", 
namespaceName);
+} catch (Exception e) {
+log.error("Failed to update migration on namespace {}", 
namespaceName, e);
+throw new RestException(e);
+}
+}
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 234d7725113..b188750252a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -1709,5 +1709,18 @@ public class Namespaces extends NamespacesBase {
 internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
 }
 
+@POST
+@Path("/{property}/{cluster}/{namespace}/migration")
+@ApiOperation(hidden = true, value = "Update migration for all topics in a 
namespace")
+@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+@ApiResponse(code = 404, message = "Property or cluster or 
namespace doesn't exist") })
+public void enableMigration(@PathParam("property") String property,
+@PathParam("cluster") String cluster,
+@PathParam("namespace") String namespace,
+boolean migrated) {
+validateNamespaceName(property, cluster, namespace);
+internalEnableMigration(migrated);
+}
+
 private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index dfa040baec5..36df0f7e31a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -2779,7 +2779,17 @@ public class Namespaces extends NamespacesBase {
 });
 }
 
-
+@POST
+@Path("/{tenant}/{namespace}/migration")
+@ApiOperation(hidden = true, value = "Update migration for all topics in a 
namespace")
+@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+@ApiResponse(code = 404, me

[pulsar] branch master updated: [fix][broker]Delete subscription and disconnect replicators after topic migration (#21029)

2023-08-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new c1b0454614b [fix][broker]Delete subscription and disconnect 
replicators after topic migration (#21029)
c1b0454614b is described below

commit c1b0454614b7903913cb0311bdcacf2118893fc9
Author: vineeth1995 
AuthorDate: Tue Aug 22 09:52:30 2023 -0700

[fix][broker]Delete subscription and disconnect replicators after topic 
migration (#21029)

Co-authored-by: Vineeth Polamreddy 
---
 .../service/nonpersistent/NonPersistentTopic.java  | 21 ++
 .../broker/service/persistent/PersistentTopic.java | 47 +-
 .../broker/service/ClusterMigrationTest.java   |  9 +
 3 files changed, 66 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index c764283cb44..836e5655168 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -959,10 +959,31 @@ public class NonPersistentTopic extends AbstractTopic 
implements Topic, TopicPol
 consumer.topicMigrated(url);
 });
 });
+return disconnectReplicators().thenCompose(__ -> 
checkAndUnsubscribeSubscriptions());
 }
 return CompletableFuture.completedFuture(null);
 }
 
+private CompletableFuture checkAndUnsubscribeSubscriptions() {
+List> futures = new ArrayList<>();
+subscriptions.forEach((s, subscription) -> {
+if (subscription.getConsumers().isEmpty()) {
+futures.add(subscription.delete());
+}
+});
+
+return FutureUtil.waitForAll(futures);
+}
+
+private CompletableFuture disconnectReplicators() {
+List> futures = new ArrayList<>();
+ConcurrentOpenHashMap replicators = 
getReplicators();
+replicators.forEach((r, replicator) -> {
+futures.add(replicator.disconnect());
+});
+return FutureUtil.waitForAll(futures);
+}
+
 @Override
 public void checkGC() {
 if (!isDeleteWhileInactive()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 57a4989b4d3..f5679665d46 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -180,6 +180,7 @@ import org.apache.pulsar.utils.StatsOutputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class PersistentTopic extends AbstractTopic implements Topic, 
AddEntryCallback {
 
 // Managed ledger associated with the topic
@@ -2575,25 +2576,49 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 @Override
 public CompletableFuture checkClusterMigration() {
 Optional clusterUrl = getMigratedClusterUrl();
-if (!isMigrated() && clusterUrl.isPresent()) {
-return ledger.asyncMigrate().thenApply(__ -> {
-subscriptions.forEach((name, sub) -> {
-if (sub.isSubsciptionMigrated()) {
-
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
-}
-});
-return null;
-});
-} else {
+if (!clusterUrl.isPresent()) {
 return CompletableFuture.completedFuture(null);
 }
+CompletableFuture migrated = !isMigrated() ? ledger.asyncMigrate() :
+CompletableFuture.completedFuture(null);
+return migrated.thenApply(__ -> {
+subscriptions.forEach((name, sub) -> {
+if (sub.isSubsciptionMigrated()) {
+
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
+}
+});
+return null;
+}).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ 
-> checkAndUnsubscribeSubscriptions());
+}
+
+private CompletableFuture checkAndUnsubscribeSubscriptions() {
+List> futures = new ArrayList<>();
+subscriptions.forEach((s, subscription) -> {
+if (subscription.getNumberOfEntriesInBacklog(true) == 0
+&& subscription.getConsumers().isEmpty()) {
+futures.

[pulsar] branch master updated: [fix][broker] Fix consumers are not redirected to migrated cluster (#20928)

2023-08-17 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4a9fec693b9 [fix][broker] Fix consumers are not redirected to migrated 
cluster (#20928)
4a9fec693b9 is described below

commit 4a9fec693b90fcb76aea9a98a94de31608b56bab
Author: vineeth1995 
AuthorDate: Thu Aug 17 22:58:35 2023 -0700

[fix][broker] Fix consumers are not redirected to migrated cluster (#20928)
---
 .../org/apache/pulsar/broker/service/Consumer.java  | 14 ++
 .../org/apache/pulsar/broker/service/ServerCnx.java |  7 +++
 .../apache/pulsar/broker/service/Subscription.java  |  3 +++
 .../nonpersistent/NonPersistentSubscription.java|  5 +
 .../service/persistent/PersistentSubscription.java  |  6 ++
 .../broker/service/persistent/PersistentTopic.java  | 10 --
 .../pulsar/broker/service/ClusterMigrationTest.java | 21 +
 7 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 176f033a6dc..ddca7ed5a89 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -818,6 +818,20 @@ public class Consumer {
 }
 }
 
+public boolean checkAndApplyTopicMigration() {
+if (subscription.isSubsciptionMigrated()) {
+Optional clusterUrl = 
AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar());
+if (clusterUrl.isPresent()) {
+ClusterUrl url = clusterUrl.get();
+
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, 
url.getBrokerServiceUrl(),
+url.getBrokerServiceUrlTls());
+// disconnect consumer after sending migrated cluster url
+disconnect();
+return true;
+}
+}
+return false;
+}
 /**
  * Checks if consumer-blocking on unAckedMessages is allowed for below 
conditions:
  * a. consumer must have Shared-subscription
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f3b355d6332..6b1af573a08 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1250,6 +1250,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 });
 })
 .thenAccept(consumer -> {
+if (consumer.checkAndApplyTopicMigration()) {
+log.info("[{}] Disconnecting consumer {} on 
migrated subscription on topic {} / {}",
+remoteAddress, consumerId, 
subscriptionName, topicName);
+consumers.remove(consumerId, consumerFuture);
+return;
+}
+
 if (consumerFuture.complete(consumer)) {
 log.info("[{}] Created subscription on topic 
{} / {}",
 remoteAddress, topicName, 
subscriptionName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 8ac855c1e7c..be079c2b4b5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -102,6 +102,8 @@ public interface Subscription extends MessageExpirer {
 
 CompletableFuture updateSubscriptionProperties(Map 
subscriptionProperties);
 
+boolean isSubsciptionMigrated();
+
 default void 
processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapshot snapshot) 
{
 // Default is no-op
 }
@@ -130,4 +132,5 @@ public interface Subscription extends MessageExpirer {
 static boolean isIndividualAckMode(SubType subType) {
 return SubType.Shared.equals(subType) || 
SubType.Key_Shared.equals(subType);
 }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 1048864ad64..7cd4d8984c8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscrip

[pulsar] branch master updated: [fix][broker] Broker failed to load v1 namespace resources cache (#20783)

2023-07-21 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new b8e6948f62d [fix][broker] Broker failed to load v1 namespace resources 
cache (#20783)
b8e6948f62d is described below

commit b8e6948f62d6ec2ca53b6a85fe2fd07d4dee6853
Author: Rajan Dhabalia 
AuthorDate: Fri Jul 21 15:41:50 2023 -0700

[fix][broker] Broker failed to load v1 namespace resources cache (#20783)
---
 .../pulsar/broker/resources/BaseResources.java | 36 ++
 .../broker/resources/NamespaceResources.java   |  2 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 14 +
 3 files changed, 51 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
index 42add4271f6..4011a482075 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java
@@ -20,11 +20,16 @@ package org.apache.pulsar.broker.resources;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -78,6 +83,37 @@ public class BaseResources {
 return cache.getChildren(path);
 }
 
+protected CompletableFuture> getChildrenRecursiveAsync(String 
path) {
+Set children = ConcurrentHashMap.newKeySet();
+CompletableFuture> result = new CompletableFuture<>();
+getChildrenRecursiveAsync(path, children, result, new 
AtomicInteger(1), path);
+return result;
+}
+
+private void getChildrenRecursiveAsync(String path, Set children, 
CompletableFuture> result,
+AtomicInteger totalResults, String parent) {
+cache.getChildren(path).thenAccept(childList -> {
+childList = childList != null ? childList : 
Collections.emptyList();
+if (totalResults.decrementAndGet() == 0 && childList.isEmpty()) {
+result.complete(new ArrayList<>(children));
+return;
+}
+if (childList.isEmpty()) {
+return;
+}
+// remove current node from children if current node is not leaf
+children.remove(parent);
+// childPrefix creates a path hierarchy if children has multi 
level path
+String childPrefix = path.equals(parent) ? "" : parent + "/";
+totalResults.addAndGet(childList.size());
+for (String child : childList) {
+children.add(childPrefix + child);
+String childPath = path + "/" + child;
+getChildrenRecursiveAsync(childPath, children, result, 
totalResults, child);
+}
+});
+}
+
 protected Optional get(String path) throws MetadataStoreException {
 try {
 return getAsync(path).get(operationTimeoutSec, TimeUnit.SECONDS);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 48f82596567..e5dd13c32eb 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -62,7 +62,7 @@ public class NamespaceResources extends 
BaseResources {
 }
 
 public CompletableFuture> listNamespacesAsync(String tenant) {
-return getChildrenAsync(joinPath(BASE_POLICIES_PATH, tenant));
+return getChildrenRecursiveAsync(joinPath(BASE_POLICIES_PATH, tenant));
 }
 
 public CompletableFuture getPoliciesReadOnlyAsync() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 83ed63bf0d9..a4f6bd4650f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.

[pulsar] branch master updated: [Fix][Doc] Update docs for Partition increase command (#20035)

2023-04-11 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1e9c7f52368 [Fix][Doc] Update docs for Partition increase command 
(#20035)
1e9c7f52368 is described below

commit 1e9c7f5236866d07105421c566241e3798f63097
Author: Apurva007 
AuthorDate: Tue Apr 11 20:07:45 2023 -0700

[Fix][Doc] Update docs for Partition increase command (#20035)

Co-authored-by: Apurva Telang 
---
 .../org/apache/pulsar/broker/admin/v1/PersistentTopics.java  |  4 ++--
 .../org/apache/pulsar/broker/admin/v2/PersistentTopics.java  |  4 ++--
 .../src/main/java/org/apache/pulsar/client/admin/Topics.java | 12 ++--
 .../org/apache/pulsar/admin/cli/CmdPersistentTopics.java |  2 +-
 .../src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java |  2 +-
 5 files changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 16f1b357dcd..e9bb1a40547 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -256,14 +256,14 @@ public class PersistentTopics extends 
PersistentTopicsBase {
 }
 
 /**
- * It updates number of partitions of an existing non-global partitioned 
topic. It requires partitioned-topic to be
+ * It updates number of partitions of an existing partitioned topic. It 
requires partitioned-topic to be
  * already exist and number of new partitions must be greater than 
existing number of partitions. Decrementing
  * number of partitions requires deletion of topic which is not supported.
  */
 @POST
 @Path("/{property}/{cluster}/{namespace}/{topic}/partitions")
 @ApiOperation(hidden = true, value = "Increment partitions of an existing 
partitioned topic.",
-notes = "It only increments partitions of existing non-global 
partitioned-topic")
+notes = "It increments partitions of existing partitioned-topic")
 @ApiResponses(value = {
 @ApiResponse(code = 204, message = "Update topic partition 
successful."),
 @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 477cc8b5712..eee363aeed8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -775,14 +775,14 @@ public class PersistentTopics extends 
PersistentTopicsBase {
 }
 
 /**
- * It updates number of partitions of an existing non-global partitioned 
topic. It requires partitioned-topic to be
+ * It updates number of partitions of an existing partitioned topic. It 
requires partitioned-topic to be
  * already exist and number of new partitions must be greater than 
existing number of partitions. Decrementing
  * number of partitions requires deletion of topic which is not supported.
  */
 @POST
 @Path("/{tenant}/{namespace}/{topic}/partitions")
 @ApiOperation(value = "Increment partitions of an existing partitioned 
topic.",
-notes = "It only increments partitions of existing non-global 
partitioned-topic")
+notes = "It increments partitions of existing partitioned-topic")
 @ApiResponses(value = {
 @ApiResponse(code = 204, message = "Update topic partition 
successful."),
 @ApiResponse(code = 307, message = "Current broker doesn't serve 
the namespace of this topic"),
diff --git 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0d5d14eb734..f599e2566bf 100644
--- 
a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -589,7 +589,7 @@ public interface Topics {
 CompletableFuture createMissedPartitionsAsync(String topic);
 
 /**
- * Update number of partitions of a non-global partitioned topic.
+ * Update number of partitions of a partitioned topic.
  * 
  * It requires partitioned-topic to be already exist and number of new 
partitions must be greater than existing
  * number of partitions. Decrementing number o

[pulsar] branch master updated (c3a92b213f0 -> 34b6e892696)

2023-04-10 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from c3a92b213f0 [fix] [ml] Fix uncompleted future when remove cursor 
(#20050)
 add 34b6e892696 [feat] [broker] PIP-188 support blue-green cluster 
migration [part-2] (#19605)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/service/Replicator.java   |   2 +
 .../apache/pulsar/broker/service/ServerCnx.java|  24 ++-
 .../org/apache/pulsar/broker/service/Topic.java|   2 +
 .../nonpersistent/NonPersistentReplicator.java |   2 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   5 +
 .../service/persistent/PersistentReplicator.java   |   4 +-
 .../broker/service/persistent/PersistentTopic.java |  21 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   8 +-
 .../broker/service/ClusterMigrationTest.java   | 229 ++---
 9 files changed, 253 insertions(+), 44 deletions(-)



[pulsar] branch master updated: [fix][cli] Fix Pulsar admin tool is ignoring tls-trust-cert path arg (#19696)

2023-03-04 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e75def80ff2 [fix][cli] Fix Pulsar admin tool is ignoring 
tls-trust-cert path arg (#19696)
e75def80ff2 is described below

commit e75def80ff2f366919b55cfd42f1dfab3f719b88
Author: Rajan Dhabalia 
AuthorDate: Sat Mar 4 08:46:50 2023 -0800

[fix][cli] Fix Pulsar admin tool is ignoring tls-trust-cert path arg 
(#19696)
---
 .../admin/internal/PulsarAdminBuilderImpl.java |  2 ++
 .../pulsar/admin/cli/PulsarAdminSupplier.java  |  5 +++-
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   | 12 ++--
 .../org/apache/pulsar/admin/cli/TestRunMain.java   | 35 ++
 4 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
index 3e7ee472e46..009fa67fbaa 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.admin.internal;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import lombok.Getter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
@@ -33,6 +34,7 @@ import 
org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 
 public class PulsarAdminBuilderImpl implements PulsarAdminBuilder {
 
+@Getter
 protected ClientConfigurationData conf;
 
 private ClassLoader clientBuilderClassLoader = null;
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java
index 6aa8d2b9c61..764dc9de5df 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminSupplier.java
@@ -53,7 +53,7 @@ public class PulsarAdminSupplier implements 
Supplier {
 }
 }
 
-private final PulsarAdminBuilder adminBuilder;
+protected final PulsarAdminBuilder adminBuilder;
 private RootParamsKey currentParamsKey;
 private PulsarAdmin admin;
 
@@ -103,6 +103,9 @@ public class PulsarAdminSupplier implements 
Supplier {
 if (isNotBlank(rootParams.tlsProvider)) {
 adminBuilder.sslProvider(rootParams.tlsProvider);
 }
+if (isNotBlank(rootParams.tlsTrustCertsFilePath)) {
+
adminBuilder.tlsTrustCertsFilePath(rootParams.tlsTrustCertsFilePath);
+}
 }
 
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index 7a7a4c4b444..ca0a8a055cf 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.common.util.ShutdownUtil;
 
 public class PulsarAdminTool {
 
-private static boolean allowSystemExit = true;
+protected static boolean allowSystemExit = true;
 
 private static int lastExitCode = Integer.MIN_VALUE;
 
@@ -54,7 +54,7 @@ public class PulsarAdminTool {
 protected JCommander jcommander;
 protected RootParams rootParams;
 private final Properties properties;
-private PulsarAdminSupplier pulsarAdminSupplier;
+protected PulsarAdminSupplier pulsarAdminSupplier;
 
 @Getter
 public static class RootParams {
@@ -277,11 +277,16 @@ public class PulsarAdminTool {
 }
 
 public static void main(String[] args) throws Exception {
+execute(args);
+}
+
+@VisibleForTesting
+public static PulsarAdminTool execute(String[] args) throws Exception {
 lastExitCode = 0;
 if (args.length == 0) {
 System.out.println("Usage: pulsar-admin CONF_FILE_PATH [options] 
[command] [command options]");
 exit(0);
-return;
+return null;
 }
 String configFile = args[0];
 Properties properties = new Properties();
@@ -299,6 +304,7 @@ public class PulsarAdminTool {
 } else {
 exit(1);
 }
+return tool;
 }
 
 private static void exit(int code) {
diff --git 
a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java
 
b/pulsar-client-tools/src/test/java/org/apache/pulsar

[pulsar] branch master updated: [improve][admin] Fix NPE in admin-CLI topic stats command (#18326)

2022-11-03 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 251d9fda0a1 [improve][admin] Fix NPE in admin-CLI topic stats command 
(#18326)
251d9fda0a1 is described below

commit 251d9fda0a1c77f589a69d906a1a014ebe2d7071
Author: vineeth1995 
AuthorDate: Thu Nov 3 14:42:43 2022 -0700

[improve][admin] Fix NPE in admin-CLI topic stats command (#18326)

Co-authored-by: Vineeth 
---
 .../src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 8e3d15ee406..5dbb3935ce9 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -347,7 +347,7 @@ public class CmdPersistentTopics extends CmdBase {
 @Override
 void run() throws PulsarAdminException {
 String persistentTopic = validatePersistentTopic(params);
-print(persistentTopics.getStats(persistentTopic, 
getPreciseBacklog));
+print(getPersistentTopics().getStats(persistentTopic, 
getPreciseBacklog));
 }
 }
 



[pulsar] branch master updated: [improve][broker] log sessionId with zk-session-watcher for debugging (#18291)

2022-11-01 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 987e0dacd31 [improve][broker] log sessionId with zk-session-watcher 
for debugging (#18291)
987e0dacd31 is described below

commit 987e0dacd31aeaba6a2f1a59ec5c55fb59bb7999
Author: Rajan Dhabalia 
AuthorDate: Tue Nov 1 16:23:17 2022 -0700

[improve][broker] log sessionId with zk-session-watcher for debugging 
(#18291)
---
 .../main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index 38d0ba32a36..a0247e22319 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -149,8 +149,8 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
 currentStatus = SessionEvent.SessionLost;
 sessionListener.accept(currentStatus);
 } else if (currentStatus != SessionEvent.SessionLost) {
-log.warn("ZooKeeper client is disconnected. Waiting to 
reconnect, time remaining = {} seconds",
-timeRemainingMillis / 1000.0);
+log.warn("[{}] ZooKeeper client is disconnected. Waiting to 
reconnect, time remaining = {} seconds",
+zk.getSessionId(), timeRemainingMillis / 1000.0);
 if (currentStatus == SessionEvent.SessionReestablished) {
 currentStatus = SessionEvent.ConnectionLost;
 sessionListener.accept(currentStatus);



[pulsar] branch master updated: [improve] [client] Add api to get producer/consumer stats for partition topic (#18212)

2022-11-01 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 26f9ffa497e [improve] [client] Add api to get producer/consumer stats 
for partition topic (#18212)
26f9ffa497e is described below

commit 26f9ffa497e16396ea4dbbcad3452b2a973f86ac
Author: Rajan Dhabalia 
AuthorDate: Tue Nov 1 12:57:11 2022 -0700

[improve] [client] Add api to get producer/consumer stats for partition 
topic (#18212)

* [improve] [client] Add api to get producer/consumer stats for partition 
topic

* introduce partition topic stats interface
---
 .../client/api/SimpleProducerConsumerStatTest.java | 61 +
 .../pulsar/client/api/MultiTopicConsumerStats.java | 39 +++
 .../client/api/PartitionedTopicProducerStats.java  | 40 +++
 .../apache/pulsar/client/api/ProducerStats.java|  1 -
 .../impl/MultiTopicConsumerStatsRecorderImpl.java  | 59 
 .../client/impl/MultiTopicsConsumerImpl.java   |  6 +-
 .../client/impl/PartitionedProducerImpl.java   |  9 ++-
 .../PartitionedTopicProducerStatsRecorderImpl.java | 79 ++
 .../pulsar/client/impl/ProducerStatsDisabled.java  |  1 +
 .../client/impl/ProducerStatsRecorderImpl.java | 17 +
 .../client/impl/ProducerStatsRecorderImplTest.java |  4 +-
 11 files changed, 292 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index ad849395045..bae891ad46a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -30,6 +31,7 @@ import com.google.gson.JsonObject;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -449,4 +451,63 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
 .until(() -> producer.getStats().getPendingQueueSize() == 
numMessages);
 assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
 }
+
+/**
+ * This test verifies partitioned topic stats for producer and consumer.
+ * @throws Exception
+ */
+@Test
+public void testPartitionTopicStats() throws Exception {
+log.info("-- Starting {} test --", methodName);
+
+String topicName = 
"persistent://my-property/my-ns/testPartitionTopicStats";
+int numPartitions = 10;
+admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+ConsumerBuilder consumerBuilder = 
pulsarClient.newConsumer().topic(topicName)
+.subscriptionName("my-subscriber-name");
+
+Consumer consumer = consumerBuilder.subscribe();
+
+ProducerBuilder producerBuilder = 
pulsarClient.newProducer().enableBatching(false).topic(topicName);
+
+Producer producer = producerBuilder.create();
+
+int numMessages = 20;
+for (int i = 0; i < numMessages; i++) {
+String message = "my-message-" + i;
+producer.send(message.getBytes());
+}
+
+Message msg = null;
+Set messageSet = new HashSet<>();
+for (int i = 0; i < numMessages; i++) {
+msg = consumer.receive(5, TimeUnit.SECONDS);
+String receivedMessage = new String(msg.getData());
+log.info("Received message: [{}]", receivedMessage);
+String expectedMessage = "my-message-" + i;
+testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+}
+// Acknowledge the consumption of all messages at once
+consumer.acknowledgeCumulative(msg);
+
+MultiTopicConsumerStats cStat = (MultiTopicConsumerStats) 
consumer.getStats();
+PartitionedTopicProducerStats pStat = (PartitionedTopicProducerStats) 
producer.getStats();
+retryStrategically((test) -> !pStat.getPartitionStats().isEmpty(), 5, 
100);
+retryStrategically((test) -> !cStat.getPartitionStats().isEmpty(), 5, 
100);
+Map prodStatsMap = pStat.getPartitionStats();
+Map consStatsMap = cStat.getPartitionStats();
+assertFalse(prodStatsMap.isE

[pulsar] branch master updated (5b452d1ce82 -> b0945d1d45d)

2022-10-19 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 5b452d1ce82 [improve][doc] Add code example for WebSocket API to use 
TLS with CLI tools (#15485)
 add b0945d1d45d [feat] [broker] PIP-188 support blue-green cluster 
migration [part-1] (#17962)

No new revisions were added by this update.

Summary of changes:
 conf/broker.conf   |   4 +
 .../apache/bookkeeper/mledger/ManagedLedger.java   |   7 +
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  33 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java |   7 +
 .../pulsar/broker/admin/impl/ClustersBase.java |  62 
 .../broker/service/AbstractBaseDispatcher.java |  13 +
 .../pulsar/broker/service/AbstractTopic.java   |  31 +-
 .../pulsar/broker/service/BrokerService.java   |  11 +
 .../broker/service/BrokerServiceException.java |  10 +
 .../org/apache/pulsar/broker/service/Consumer.java |  13 +
 .../org/apache/pulsar/broker/service/Producer.java |  11 +
 .../pulsar/broker/service/PulsarCommandSender.java |   3 +
 .../broker/service/PulsarCommandSenderImpl.java|  13 +
 .../apache/pulsar/broker/service/ServerCnx.java|  19 +-
 .../org/apache/pulsar/broker/service/Topic.java|   2 +
 .../service/nonpersistent/NonPersistentTopic.java  |  40 ++-
 .../service/persistent/CompactorSubscription.java  |   4 +-
 .../PersistentDispatcherMultipleConsumers.java |   2 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   2 +-
 ...istentStreamingDispatcherMultipleConsumers.java |   3 +-
 ...entStreamingDispatcherSingleActiveConsumer.java |   2 +-
 .../service/persistent/PersistentSubscription.java |   5 +-
 .../broker/service/persistent/PersistentTopic.java |  23 +-
 .../broker/intercept/CounterBrokerInterceptor.java |   2 +-
 .../broker/service/ClusterMigrationTest.java   | 329 +
 pulsar-client-admin-api/pom.xml|   1 +
 .../org/apache/pulsar/client/admin/Clusters.java   |  37 +++
 .../pulsar/common/policies/data/ClusterData.java   |  23 ++
 .../pulsar/client/admin/internal/ClustersImpl.java |  14 +
 .../pulsar/client/api/PulsarClientException.java   |  16 +
 .../org/apache/pulsar/admin/cli/CmdClusters.java   |  24 ++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  23 ++
 .../pulsar/client/impl/ConnectionHandler.java  |   7 +-
 .../apache/pulsar/client/impl/HandlerState.java|   9 +
 pulsar-common/pom.xml  |   5 +
 .../common/policies/data/ClusterDataImpl.java  |  27 +-
 .../apache/pulsar/common/protocol/Commands.java|  11 +
 .../pulsar/common/protocol/PulsarDecoder.java  |  10 +
 pulsar-common/src/main/proto/PulsarApi.proto   |  17 ++
 site2/docs/reference-pulsar-admin.md   |  18 ++
 .../offload/jcloud/impl/MockManagedLedger.java |  12 +
 41 files changed, 886 insertions(+), 19 deletions(-)
 create mode 100644 
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java



[pulsar] branch master updated: [improve][pulsar-testclient] Add proxyServiceUrl and proxyProtocol as options for PerfTool CLI (#17862)

2022-10-03 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d1b7c6af37f [improve][pulsar-testclient] Add proxyServiceUrl and 
proxyProtocol as options for PerfTool CLI (#17862)
d1b7c6af37f is described below

commit d1b7c6af37fab50ac916e9e5b61c4345ad3643ad
Author: vineeth1995 
AuthorDate: Mon Oct 3 18:09:46 2022 -0700

[improve][pulsar-testclient] Add proxyServiceUrl and proxyProtocol as 
options for PerfTool CLI (#17862)

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

* Add proxyServiceUrl and proxyProtocol as oprtions for PerfTool CLI

Co-authored-by: Vineeth 
---
 conf/client.conf   |   4 +
 .../apache/pulsar/testclient/PerfClientUtils.java  |   3 +-
 .../testclient/PerformanceBaseArguments.java   |  27 +
 .../pulsar/testclient/PerfClientUtilsTest.java |   5 +
 .../testclient/PerformanceBaseArgumentsTest.java   | 113 +
 .../src/test/resources/perf_client1.conf   |   2 +
 6 files changed, 153 insertions(+), 1 deletion(-)

diff --git a/conf/client.conf b/conf/client.conf
index ea1d339a09c..8a485e5676c 100644
--- a/conf/client.conf
+++ b/conf/client.conf
@@ -88,7 +88,11 @@ tlsKeyStorePassword=
 # When TLS authentication with KeyStore is used, available options can be 
SunJSSE, Conscrypt and so on.
 webserviceTlsProvider=
 
+#Proxy-server URL to which to connect
+proxyServiceUrl=
 
+#Proxy protocol to select type of routing at proxy
+proxyProtocol=
 
 # Pulsar Admin Custom Commands
 #customCommandFactoriesDirectory=commandFactories
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index 1ce5777fd32..a3552d31430 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -75,7 +75,8 @@ public class PerfClientUtils {
 .enableBusyWait(arguments.enableBusyWait)
 .listenerThreads(arguments.listenerThreads)
 .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath)
-.maxLookupRequests(arguments.maxLookupRequest);
+.maxLookupRequests(arguments.maxLookupRequest)
+.proxyServiceUrl(arguments.proxyServiceURL, 
arguments.proxyProtocol);
 
 if (isNotBlank(arguments.authPluginClassName)) {
 clientBuilder.authentication(arguments.authPluginClassName, 
arguments.authParams);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
index cff7e16e9ca..307af7cbb15 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceBaseArguments.java
@@ -19,10 +19,12 @@
 package org.apache.pulsar.testclient;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.testclient.PerfClientUtils.exit;
 import com.beust.jcommander.Parameter;
 import java.io.FileInputStream;
 import java.util.Properties;
 import lombok.SneakyThrows;
+import org.apache.pulsar.client.api.ProxyProtocol;
 
 
 public abstract class PerformanceBaseArguments {
@@ -85,6 +87,12 @@ public abstract class PerformanceBaseArguments {
 + "on each broker connection to prevent overloading a broker")
 public int maxLookupRequest = 5;
 
+@Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to 
which to connect.")
+String proxyServiceURL = null;
+
+@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol 
to select type of routing at proxy.")
+ProxyProtocol proxyProtocol = null;
+
 public abstract void fillArgumentsFromProperties(Properties prop);
 
 @SneakyThrows
@@ -133,6 +141,25 @@ public abstract class PerformanceBaseArguments {
 .getProperty("tlsEnableHostnameVerification", ""));
 
 }
+
+if (proxyServiceURL == null) {
+proxyServiceURL = prop.g

[pulsar-manager] branch master updated: Redirect notifications to the commits@ list instead of dev (#490)

2022-09-28 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git


The following commit(s) were added to refs/heads/master by this push:
 new 519b476  Redirect notifications to the commits@ list instead of dev 
(#490)
519b476 is described below

commit 519b47699872e4034618d3974af77619703e8d3a
Author: Matteo Merli 
AuthorDate: Wed Sep 28 12:21:41 2022 -0700

Redirect notifications to the commits@ list instead of dev (#490)
---
 .asf.yaml | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/.asf.yaml b/.asf.yaml
index 209aec5..5c8b9e9 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -45,4 +45,11 @@ github:
 # disable merge button:
 merge:   false
 # disable rebase button:
-rebase:  false
\ No newline at end of file
+rebase:  false
+
+notifications:
+  commits:  commits@pulsar.apache.org
+  issues:   commits@pulsar.apache.org
+  pullrequests: commits@pulsar.apache.org
+  discussions:  d...@pulsar.apache.org
+  jira_options: link label



[pulsar] branch master updated (518cdcd9c2c -> 6528a914350)

2022-09-26 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 518cdcd9c2c [fix][metrics]wrong metrics text generated when 
label_cluster specified (#17704)
 add 6528a914350 [Pulsar-init] Support cluster init using proxy url and 
protocol (#17844)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/pulsar/PulsarClusterMetadataSetup.java  | 13 +
 1 file changed, 13 insertions(+)



[pulsar-manager] branch master updated: Removing transitive dependencies for log4j (#488)

2022-09-15 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git


The following commit(s) were added to refs/heads/master by this push:
 new 3b2dd2f  Removing transitive dependencies for log4j (#488)
3b2dd2f is described below

commit 3b2dd2fa20008b0c15992cac2946ef59daed08f6
Author: gurleen-gks 
AuthorDate: Thu Sep 15 10:00:56 2022 -0700

Removing transitive dependencies for log4j (#488)

Co-authored-by: Gurleen Kaur 
---
 build.gradle | 1 +
 1 file changed, 1 insertion(+)

diff --git a/build.gradle b/build.gradle
index 84a5410..e442883 100644
--- a/build.gradle
+++ b/build.gradle
@@ -42,6 +42,7 @@ configurations {
 testPlugins {}
 all*.exclude group: 'org.bouncycastle', module: 'bc-fips'
 all*.exclude group: 'io.netty', module: 'netty-tcnative-classes'
+all*.exclude group: 'log4j', module: 'log4j'
 }
 
 dependencies {



[pulsar] branch master updated (f453e0a5b7a -> 5fce0ce9596)

2022-09-07 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from f453e0a5b7a remove unnecessary parameters(reusefuture) and related 
logic (#17378)
 add 5fce0ce9596 [improve][pulsar-testclient] Add disableBatching as CLI 
argument for pulsar-perf-producer (#17381)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/testclient/PerformanceProducer.java | 82 +-
 .../pulsar/testclient/PerformanceProducerTest.java | 26 +++
 2 files changed, 73 insertions(+), 35 deletions(-)



[pulsar-manager] branch master updated: Removing conflicting dependencies (#482)

2022-08-15 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git


The following commit(s) were added to refs/heads/master by this push:
 new 765c510  Removing conflicting dependencies (#482)
765c510 is described below

commit 765c5109e652558cbeb75ddcd564051bc791
Author: gurleen-gks 
AuthorDate: Mon Aug 15 15:56:13 2022 -0700

Removing conflicting dependencies (#482)

Co-authored-by: Gurleen Kaur 
---
 build.gradle | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/build.gradle b/build.gradle
index 723a6fb..84a5410 100644
--- a/build.gradle
+++ b/build.gradle
@@ -40,6 +40,8 @@ repositories {
 
 configurations {
 testPlugins {}
+all*.exclude group: 'org.bouncycastle', module: 'bc-fips'
+all*.exclude group: 'io.netty', module: 'netty-tcnative-classes'
 }
 
 dependencies {



[pulsar] branch master updated: merge conflicts (#16961)

2022-08-05 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 7a65f3ff907 merge conflicts (#16961)
7a65f3ff907 is described below

commit 7a65f3ff907385af4b115f300324e5ec60aa25ac
Author: vineeth1995 
AuthorDate: Fri Aug 5 14:32:27 2022 -0700

merge conflicts (#16961)

Co-authored-by: Vineeth 
---
 .../pulsar/client/cli/PulsarClientToolTest.java| 26 ++
 .../apache/pulsar/client/cli/PulsarClientTool.java | 11 +
 2 files changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 4a1589250ad..5cc5e50b96d 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.beust.jcommander.JCommander;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -269,6 +270,31 @@ public class PulsarClientToolTest extends BrokerTestBase {
 }
 }
 
+@Test(timeOut = 2)
+public void testArgs() throws Exception {
+PulsarClientTool pulsarClientTool = new PulsarClientTool(new 
Properties());
+final String url = "pulsar+ssl://localhost:6651";
+final String authPlugin = 
"org.apache.pulsar.client.impl.auth.AuthenticationTls";
+final String authParams = 
"tlsCertFile:pulsar-broker/src/test/resources/authentication/tls/client-cert.pem,"
 +
+
"tlsKeyFile:pulsar-broker/src/test/resources/authentication/tls/client-key.pem";
+final String tlsTrustCertsFilePath = 
"pulsar/pulsar-broker/src/test/resources/authentication/tls/cacert.pem";
+final String message = "test msg";
+final int numberOfMessages = 1;
+final String topicName = getTopicWithRandomSuffix("test-topic");
+
+String[] args = {"--url", url, 
+"--auth-plugin", authPlugin,
+"--auth-params", authParams,
+"--tlsTrustCertsFilePath", tlsTrustCertsFilePath,
+"produce", "-m", message,
+"-n", Integer.toString(numberOfMessages), topicName};
+pulsarClientTool.jcommander.parse(args);
+assertEquals(pulsarClientTool.rootParams.getTlsTrustCertsFilePath(), 
tlsTrustCertsFilePath);
+assertEquals(pulsarClientTool.rootParams.getAuthParams(), authParams);
+assertEquals(pulsarClientTool.rootParams.getAuthPluginClassName(), 
authPlugin);
+assertEquals(pulsarClientTool.rootParams.getServiceURL(), url);
+}
+
 private static String getTopicWithRandomSuffix(String localNameBase) {
 return String.format("persistent://prop/ns-abc/test/%s-%s", 
localNameBase, UUID.randomUUID().toString());
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 32770f3bd07..648505b7d75 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -72,15 +72,19 @@ public class PulsarClientTool {
 
 @Parameter(names = { "-h", "--help", }, help = true, description = 
"Show this help.")
 boolean help;
+
+@Parameter(names = { "--tlsTrustCertsFilePath" }, description = "File 
path to client trust certificates")
+String tlsTrustCertsFilePath;
 }
 
 protected RootParams rootParams;
 boolean tlsAllowInsecureConnection;
 boolean tlsEnableHostnameVerification;
-String tlsTrustCertsFilePath;
+
 String tlsKeyFilePath;
 String tlsCertificateFilePath;
 
+
 // for tls with keystore type config
 boolean useKeyStoreTls;
 String tlsTrustStoreType;
@@ -103,8 +107,6 @@ public class PulsarClientTool {
 
.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
 this.tlsEnableHostnameVerification = Boolean
 
.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
-this.tlsTrustCertsFilePath = 
properties.getPro

[pulsar] branch master updated: [PIP-136] Sync Pulsar metadata across multiple clouds (#16425)

2022-07-31 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a38f74dfefe [PIP-136] Sync Pulsar metadata across multiple clouds 
(#16425)
a38f74dfefe is described below

commit a38f74dfefec8620a16b384da428c6c4fedbdd70
Author: Rajan Dhabalia 
AuthorDate: Sun Jul 31 16:39:27 2022 -0700

[PIP-136] Sync Pulsar metadata across multiple clouds (#16425)

* [PIP-136] Sync Pulsar policies across multiple clouds

add unit test

add findbug

* address comments
---
 conf/broker.conf   |   6 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  17 ++
 .../org/apache/pulsar/broker/PulsarService.java|  35 +++-
 .../service/PulsarMetadataEventSynchronizer.java   | 199 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  30 +++-
 .../OwnerShipForCurrentServerTestBase.java |   4 +-
 .../broker/transaction/TransactionTestBase.java|   4 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |   4 +-
 pulsar-metadata/pom.xml|   1 +
 .../apache/pulsar/metadata/api/MetadataEvent.java  |  57 ++
 .../metadata/api/MetadataEventSynchronizer.java|  53 ++
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   6 +
 .../api/extended/MetadataStoreExtended.java|  10 ++
 .../metadata/impl/AbstractMetadataStore.java   | 137 --
 .../metadata/impl/LocalMemoryMetadataStore.java|  10 ++
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |  10 ++
 .../pulsar/metadata/impl/ZKMetadataStore.java  |   8 +-
 .../batching/AbstractBatchedMetadataStore.java |  11 ++
 .../src/main/resources/findbugsExclude.xml |  34 
 .../impl/LocalMemoryMetadataStoreTest.java | 196 
 ...est.java => MetadataEventSynchronizerTest.java} |   2 +-
 site2/docs/reference-configuration-broker.md   |  19 ++
 site2/docs/reference-configuration.md  |   2 +-
 23 files changed, 795 insertions(+), 60 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 7a0e32c95b3..5216034fdb8 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -29,6 +29,12 @@ metadataStoreUrl=
 # Configuration file path for metadata store. It's supported by 
RocksdbMetadataStore and EtcdMetadataStore for now
 metadataStoreConfigPath=
 
+# Event topic to sync metadata between separate pulsar clusters on different 
cloud platforms.
+metadataSyncEventTopic=
+
+# Event topic to sync configuration-metadata between separate pulsar clusters 
on different cloud platforms.
+configurationMetadataSyncEventTopic=
+
 # The metadata store URL for the configuration data. If empty, we fall back to 
use metadataStoreUrl
 configurationMetadataStoreUrl=
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 4fb680cf8b4..93c79e5398e 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -483,6 +483,23 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 )
 private String metadataStoreConfigPath = null;
 
+
+@FieldContext(
+dynamic = true,
+category = CATEGORY_SERVER,
+doc = "Event topic to sync metadata between separate pulsar "
++ "clusters on different cloud platforms."
+)
+private String metadataSyncEventTopic = null;
+
+@FieldContext(
+dynamic = true,
+category = CATEGORY_SERVER,
+doc = "Event topic to sync configuration-metadata between separate 
pulsar "
++ "clusters on different cloud platforms."
+)
+private String configurationMetadataSyncEventTopic = null;
+
 @FieldContext(
 dynamic = true,
 doc = "Factory class-name to create topic with custom workflow"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5c08eb1fc08..f0d9e68f380 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -101,6 +101,7 @@ import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
 import 
org.apache.pulsar.broker.ser

[pulsar] branch master updated: Support encryption in websocket proxy (#16234)

2022-07-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 52da03f08f4 Support encryption in websocket proxy (#16234)
52da03f08f4 is described below

commit 52da03f08f44aa841ba5510ecb83b8834ba8964c
Author: Rajan Dhabalia 
AuthorDate: Wed Jul 20 17:49:32 2022 -0700

Support encryption in websocket proxy (#16234)

fix doc
---
 .../configuration/PulsarConfigurationLoader.java   |  16 +-
 .../proxy/ProxyEncryptionPublishConsumeTest.java   | 249 +
 .../apache/pulsar/websocket/ConsumerHandler.java   |   3 +
 .../pulsar/websocket/CryptoKeyReaderFactory.java   |  29 +++
 .../apache/pulsar/websocket/ProducerHandler.java   |  10 +
 .../org/apache/pulsar/websocket/ReaderHandler.java |   3 +
 .../apache/pulsar/websocket/WebSocketService.java  |  19 ++
 .../service/WebSocketProxyConfiguration.java   |   5 +
 site2/docs/client-libraries-websocket.md   |   7 +
 9 files changed, 338 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
index 12bba1a7fa2..b0479d81588 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/common/configuration/PulsarConfigurationLoader.java
@@ -182,23 +182,33 @@ public class PulsarConfigurationLoader {
 final ServiceConfiguration convertedConf = 
ServiceConfiguration.class
 .getDeclaredConstructor().newInstance();
 Field[] confFields = conf.getClass().getDeclaredFields();
+Properties properties = new Properties();
 Arrays.stream(confFields).forEach(confField -> {
 try {
-Field convertedConfField = 
ServiceConfiguration.class.getDeclaredField(confField.getName());
 confField.setAccessible(true);
+Field convertedConfField = 
ServiceConfiguration.class.getDeclaredField(confField.getName());
 if (!Modifier.isStatic(convertedConfField.getModifiers())) 
{
 convertedConfField.setAccessible(true);
 convertedConfField.set(convertedConf, 
confField.get(conf));
 }
 } catch (NoSuchFieldException e) {
 if (!ignoreNonExistMember) {
-throw new IllegalArgumentException("Exception caused 
while converting configuration: "
-+ e.getMessage());
+throw new IllegalArgumentException(
+"Exception caused while converting 
configuration: " + e.getMessage());
+}
+// add unknown fields to properties
+try {
+if (confField.get(conf) != null) {
+properties.put(confField.getName(), 
confField.get(conf));
+}
+} catch (Exception ignoreException) {
+// should not happen
 }
 } catch (IllegalAccessException e) {
 throw new RuntimeException("Exception caused while 
converting configuration: " + e.getMessage());
 }
 });
+convertedConf.getProperties().putAll(properties);
 return convertedConf;
 } catch (InstantiationException | IllegalAccessException
 | InvocationTargetException | NoSuchMethodException e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
new file mode 100644
index 000..bbd2b3bd14f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WIT

[pulsar] branch master updated: [pulsar-broker] PIP-100 Support pluggable topic factory (#12235)

2022-07-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f38c5aa5b9 [pulsar-broker] PIP-100 Support pluggable topic factory 
(#12235)
6f38c5aa5b9 is described below

commit 6f38c5aa5b9838c3a2a5fea10e13dd3830947d9f
Author: Rajan Dhabalia 
AuthorDate: Wed Jul 20 13:01:44 2022 -0700

[pulsar-broker] PIP-100 Support pluggable topic factory (#12235)

add Closeable

fix test

close factory and handle exception
---
 conf/broker.conf   |  3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../pulsar/broker/service/BrokerService.java   | 43 -
 .../apache/pulsar/broker/service/TopicFactory.java | 31 ++
 .../broker/service/PersistentTopicE2ETest.java | 71 +++---
 .../common/naming/ServiceConfigurationTest.java|  1 -
 site2/docs/reference-configuration.md  |  2 +
 7 files changed, 146 insertions(+), 11 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d2e13d117e8..c19afe981ca 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -136,6 +136,9 @@ brokerShutdownTimeoutMs=6
 # Flag to skip broker shutdown when broker handles Out of memory error
 skipBrokerShutdownOnOOM=false
 
+# Factory class-name to create topic with custom workflow
+topicFactoryClassName=
+
 # Enable backlog quota check. Enforces action on topic when the quota is 
reached
 backlogQuotaCheckEnabled=true
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d2e68fb437c..ad4188d2882 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -479,6 +479,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 )
 private String metadataStoreConfigPath = null;
 
+@FieldContext(
+dynamic = true,
+doc = "Factory class-name to create topic with custom workflow"
+)
+private String topicFactoryClassName;
+
 @FieldContext(
 category = CATEGORY_POLICIES,
 doc = "Enable backlog quota check. Enforces actions on topic when the 
quota is reached"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e80d603a141..8e701ebf5bf 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -276,6 +276,7 @@ public class BrokerService implements Closeable {
 private final LongAdder pausedConnections = new LongAdder();
 private BrokerInterceptor interceptor;
 private ImmutableMap entryFilters;
+private TopicFactory topicFactory;
 
 private Set 
brokerEntryMetadataInterceptors;
 private Set brokerEntryPayloadProcessors;
@@ -346,6 +347,7 @@ public class BrokerService implements Closeable {
 this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration());
 this.blockedDispatchers =
 
ConcurrentOpenHashSet.newBuilder().build();
+this.topicFactory = createPersistentTopicFactory();
 // update dynamic configuration and register-listener
 updateConfigurationAndRegisterListeners();
 this.lookupRequestSemaphore = new AtomicReference(
@@ -1127,7 +1129,13 @@ public class BrokerService implements Closeable {
 new NotAllowedException("Broker is not unable to load 
non-persistent topic"));
 }
 final long topicCreateTimeMs = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
-NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, 
this);
+NonPersistentTopic nonPersistentTopic;
+try {
+nonPersistentTopic = newTopic(topic, null, this, 
NonPersistentTopic.class);
+} catch (Exception e) {
+log.warn("Failed to create topic {}", topic, e);
+return FutureUtil.failedFuture(e);
+}
 CompletableFuture isOwner = checkTopicNsOwnership(topic);
 isOwner.thenRun(() -> {
 nonPersistentTopic.initialize()
@@ -1412,7 +1420,7 @@ public class BrokerService implements Closeable {
 try {
 PersistentTopic persistentTopic = 
isSystemTopic(topic)
 ? new SystemTopic(topic, ledger, 
BrokerService.this)
-: new Persist

[pulsar] branch master updated: [pulsar-broker] Support caching to drain backlog consumers (#12258)

2022-07-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 586979152bf [pulsar-broker] Support caching to drain backlog consumers 
(#12258)
586979152bf is described below

commit 586979152bf22de5a1dff3e955dfcdf540257a08
Author: Rajan Dhabalia 
AuthorDate: Wed Jul 20 09:48:41 2022 -0700

[pulsar-broker] Support caching to drain backlog consumers (#12258)
---
 conf/standalone.conf   | 10 +++
 .../apache/bookkeeper/mledger/ManagedLedger.java   |  5 ++
 .../bookkeeper/mledger/ManagedLedgerConfig.java| 59 ++
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  9 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 59 --
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |  7 +-
 .../bookkeeper/mledger/impl/cache/EntryCache.java  |  6 +-
 .../mledger/impl/cache/RangeEntryCacheImpl.java| 16 ++--
 .../apache/bookkeeper/mledger/util/RangeCache.java |  4 +
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java | 24 +-
 .../pulsar/broker/service/BrokerService.java   |  6 ++
 .../apache/pulsar/broker/service/PulsarStats.java  |  3 +-
 .../org/apache/pulsar/broker/service/Topic.java|  2 +
 .../service/nonpersistent/NonPersistentTopic.java  |  5 ++
 .../broker/service/persistent/PersistentTopic.java |  9 +++
 .../client/api/MessageDispatchThrottlingTest.java  | 92 ++
 .../client/api/SimpleProducerConsumerTest.java |  2 +-
 .../offload/jcloud/impl/MockManagedLedger.java |  5 ++
 19 files changed, 306 insertions(+), 19 deletions(-)

diff --git a/conf/standalone.conf b/conf/standalone.conf
index f6bf8fe91c5..b3281c02737 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -752,6 +752,16 @@ managedLedgerAddEntryTimeoutSeconds=0
 # Of course, use a smaller value may degrade consumption throughput. Default 
is 10ms.
 managedLedgerNewEntriesCheckDelayInMillis=10
 
+# Minimum cursors that must be in backlog state to cache and reuse the read 
entries.
+# (Default =0 to disable backlog reach cache)
+managedLedgerMinimumBacklogCursorsForCaching=0
+
+# Minimum backlog entries for any cursor before start caching reads.
+managedLedgerMinimumBacklogEntriesForCaching=100
+
+# Maximum backlog entry difference to prevent caching entries that can't be 
reused.
+managedLedgerMaxBacklogBetweenCursorsForCaching=1
+
 # Use Open Range-Set to cache unacked messages
 managedLedgerUnackedRangesOpenCacheSetEnabled=true
 
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 7196a3b4c03..0ebbd514a52 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -667,4 +667,9 @@ public interface ManagedLedger {
  * roll over that ledger if inactive.
  */
 void checkInactiveLedgerAndRollOver();
+
+/**
+ * Check if managed ledger should cache backlog reads.
+ */
+void checkCursorsToCacheEntries();
 }
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index e628a253563..788732e763a 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -81,6 +81,9 @@ public class ManagedLedgerConfig {
 @Getter
 @Setter
 private boolean cacheEvictionByMarkDeletedPosition = false;
+private int minimumBacklogCursorsForCaching = 0;
+private int minimumBacklogEntriesForCaching = 1000;
+private int maxBacklogBetweenCursorsForCaching = 1000;
 
 public boolean isCreateIfMissing() {
 return createIfMissing;
@@ -683,4 +686,60 @@ public class ManagedLedgerConfig {
 this.inactiveLedgerRollOverTimeMs = (int) 
unit.toMillis(inactiveLedgerRollOverTimeMs);
 }
 
+/**
+ * Minimum cursors with backlog after which broker is allowed to cache 
read entries to reuse them for other cursors'
+ * backlog reads. (Default = 0, broker will not cache backlog reads)
+ *
+ * @return
+ */
+public int getMinimumBacklogCursorsForCaching() {
+return minimumBacklogCursorsForCaching;
+}
+
+/**
+ * Set Minimum cursors with backlog after which broker is allowed to cache 
read entries to reuse them for other
+ * cursors' backlog reads.
+ *
+ * @param minimumBacklogCursorsForCaching
+ */
+public void setMinimumBacklogCursorsForCaching(int 
minimumBacklogCursorsForCaching

[pulsar] branch master updated: Support mechanism to provide external zookeeper-server list to build global/configuration zookeeper (#15987)

2022-06-21 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new e976bec05fb Support mechanism to provide external zookeeper-server 
list to build global/configuration zookeeper (#15987)
e976bec05fb is described below

commit e976bec05fb6be9d51115bce52972f733bb0c10a
Author: Rajan Dhabalia 
AuthorDate: Tue Jun 21 15:59:05 2022 -0700

Support mechanism to provide external zookeeper-server list to build 
global/configuration zookeeper (#15987)
---
 docker/pulsar/scripts/generate-zookeeper-config.sh | 20 ++--
 1 file changed, 14 insertions(+), 6 deletions(-)

diff --git a/docker/pulsar/scripts/generate-zookeeper-config.sh 
b/docker/pulsar/scripts/generate-zookeeper-config.sh
index 53d7600f80e..2852742ef26 100755
--- a/docker/pulsar/scripts/generate-zookeeper-config.sh
+++ b/docker/pulsar/scripts/generate-zookeeper-config.sh
@@ -34,14 +34,22 @@ DOMAIN=`hostname -d`
 IDX=1
 for SERVER in $(echo $ZOOKEEPER_SERVERS | tr "," "\n")
 do
-echo "server.$IDX=$SERVER.$DOMAIN:2888:3888" >> $CONF_FILE
-
-if [ "$HOSTNAME" == "$SERVER" ]; then
-MY_ID=$IDX
-echo "Current server id $MY_ID"
+if [[ ! -z "$EXTERNAL_PROVIDED_SERVERS" && $(fgrep -ix 
$EXTERNAL_PROVIDED_SERVERS <<< "true") ]]; then
+   echo "server.$IDX=$SERVER" >> $CONF_FILE
+   # external zk-server connect string starts with hostname
+   if [[ "$SERVER" == "$HOSTNAME"* ]]; then
+ MY_ID=$IDX
+ echo "Current server id $MY_ID"
+   fi
+else
+   echo "server.$IDX=$SERVER.$DOMAIN:2888:3888" >> $CONF_FILE
+   if [ "$HOSTNAME" == "$SERVER" ]; then
+ MY_ID=$IDX
+ echo "Current server id $MY_ID"
+   fi
 fi
 
-   ((IDX++))
+((IDX++))
 done
 
 # For ZooKeeper container we need to initialize the ZK id



[pulsar] branch master updated: add doc for how to configure chunking for readers (#15211)

2022-04-23 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3823ddf89cc add doc for how to configure chunking for readers (#15211)
3823ddf89cc is described below

commit 3823ddf89cc22f14f5f994ea030bd365d58d28b3
Author: momo-jun <60642177+momo-...@users.noreply.github.com>
AuthorDate: Sun Apr 24 11:12:13 2022 +0800

add doc for how to configure chunking for readers (#15211)
---
 site2/docs/client-libraries-java.md | 16 
 1 file changed, 16 insertions(+)

diff --git a/site2/docs/client-libraries-java.md 
b/site2/docs/client-libraries-java.md
index e640b33216a..13205461f74 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -1123,6 +1123,22 @@ pulsarClient.newReader()
 Total hash range size is 65536, so the max end of the range should be less 
than or equal to 65535.
 
 
+### Configure chunking
+
+Configuring chuncking for readers is similar to that for consumers. See 
[configure chunking for consumers](#configure-chunking) for more information.
+
+The following is an example of how to configure message chunking for a reader.
+
+```java
+Reader reader = pulsarClient.newReader()
+.topic(topicName)
+.startMessageId(MessageId.earliest)
+.maxPendingChunkedMessage(12)
+.autoAckOldestChunkedMessageOnQueueFull(true)
+.expireTimeOfIncompleteChunkedMessage(12, TimeUnit.MILLISECONDS)
+.create();
+```
+
 ## TableView
 
 The TableView interface serves an encapsulated access pattern, providing a 
continuously updated key-value map view of the compacted topic data. Messages 
without keys will be ignored.



[pulsar] branch master updated: [pulsar-client] Add message chunking configuration for reader (#15143)

2022-04-14 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d8923b8b20f [pulsar-client] Add message chunking configuration for 
reader (#15143)
d8923b8b20f is described below

commit d8923b8b20f4d041ec07eed55aa08099447c6a2b
Author: Rajan Dhabalia 
AuthorDate: Thu Apr 14 21:12:36 2022 -0700

[pulsar-client] Add message chunking configuration for reader (#15143)
---
 .../pulsar/client/impl/MessageChunkingTest.java| 36 -
 .../apache/pulsar/client/api/ReaderBuilder.java| 47 ++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  7 
 .../pulsar/client/impl/ReaderBuilderImpl.java  | 18 +
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  7 
 .../client/impl/conf/ReaderConfigurationData.java  |  8 
 6 files changed, 122 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 4d752029c0f..85d67c3de0d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.SizeUnit;
 import org.apache.pulsar.client.impl.MessageImpl.SchemaState;
 import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
@@ -217,6 +218,8 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 .isAckReceiptEnabled(ackReceiptEnabled)
 .ackTimeout(5, TimeUnit.SECONDS).subscribe();
 
+Reader reader = 
pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();
+
 ProducerBuilder producerBuilder = 
pulsarClient.newProducer().topic(topicName);
 
 Producer producer = 
producerBuilder.enableChunking(true).enableBatching(false).create();
@@ -232,6 +235,15 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
 Message msg = null;
 Set messageSet = Sets.newHashSet();
+for (int i = 0; i < totalMessages; i++) {
+msg = reader.readNext(5, TimeUnit.SECONDS);
+String receivedMessage = new String(msg.getData());
+log.info("Received message: [{}]", receivedMessage);
+String expectedMessage = publishedMessages.get(i);
+testMessageOrderAndDuplicates(messageSet, receivedMessage, 
expectedMessage);
+}
+
+messageSet.clear();
 for (int i = 0; i < totalMessages; i++) {
 msg = consumer.receive(5, TimeUnit.SECONDS);
 String receivedMessage = new String(msg.getData());
@@ -268,6 +280,7 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
 consumer.close();
 producer.close();
+reader.close();
 log.info("-- Exiting {} test --", methodName);
 
 }
@@ -384,6 +397,8 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 producer.cnx().registerProducer(producerId, producer); // registered 
spy ProducerImpl
 ConsumerImpl consumer = (ConsumerImpl) 
pulsarClient.newConsumer().topic(topicName)
 .subscriptionName("my-sub").subscribe();
+ReaderImpl reader = (ReaderImpl) 
pulsarClient.newReader().topic(topicName)
+.startMessageId(MessageId.earliest).create();
 
 TypedMessageBuilderImpl msg = 
(TypedMessageBuilderImpl) 
producer.newMessage().value("message-1".getBytes());
 ByteBuf payload = Unpooled.wrappedBuffer(msg.getContent());
@@ -397,17 +412,22 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 producer.processOpSendMsg(op);
 
 retryStrategically((test) -> {
-return consumer.chunkedMessagesMap.size() > 0;
+return reader.getConsumer().chunkedMessagesMap.size() > 0 && 
consumer.chunkedMessagesMap.size() > 0;
 }, 5, 500);
 assertEquals(consumer.chunkedMessagesMap.size(), 1);
+assertEquals(reader.getConsumer().chunkedMessagesMap.size(), 1);
 
 consumer.expireTimeOfIncompleteChunkedMessageMillis = 1;
+reader.getConsumer().expireTimeOfIncompleteChunkedMessageMillis = 1;
 Thread.sleep(10);
 consumer.removeExpireIncompleteChunkedMessages();
+reader.getConsumer().removeExpireIncompleteChunkedMessages();
 assertEquals(consumer.chunk

[pulsar-adapters] branch master updated: Auto update partition for pulsar kafka producer (#28)

2022-03-31 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
 new 0adca97  Auto update partition for pulsar kafka producer (#28)
0adca97 is described below

commit 0adca97f98025ce60ea7504431711b01a9b080cd
Author: Rajan Dhabalia 
AuthorDate: Thu Mar 31 10:19:19 2022 -0700

Auto update partition for pulsar kafka producer (#28)
---
 .../clients/producer/PulsarKafkaProducer.java  | 75 +-
 .../kafka/compat/PulsarProducerKafkaConfig.java|  1 +
 .../clients/producer/PulsarKafkaProducerTest.java  | 72 +++--
 3 files changed, 126 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index fca5da8..80ca9fd 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -30,7 +30,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -44,26 +46,31 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
-import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
-import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+
 public class PulsarKafkaProducer implements Producer {
 
+private static final Logger log = 
LoggerFactory.getLogger(PulsarKafkaProducer.class);
 private final PulsarClient client;
 final ProducerBuilder pulsarProducerBuilder;
 
@@ -74,6 +81,9 @@ public class PulsarKafkaProducer implements Producer {
 
 private final Partitioner partitioner;
 private volatile Cluster cluster = Cluster.empty();
+@Getter
+private final int autoUpdatePartitionDurationMs;
+private final ScheduledExecutorService executor;
 
 private List> interceptors;
 
@@ -176,6 +186,10 @@ public class PulsarKafkaProducer implements 
Producer {
 
 interceptors = (List) producerConfig.getConfiguredInstances(
 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
ProducerInterceptor.class);
+
+autoUpdatePartitionDurationMs = Integer.parseInt(
+
properties.getProperty(PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS_REFRESH_DURATION,
 "30"));
+executor = Executors.newSingleThreadScheduledExecutor();
 }
 
 @Override
@@ -272,14 +286,19 @@ public class PulsarKafkaProducer implements 
Producer {
 public void close() {
 close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
 partitioner.close();
+if (executor != null) {
+executor.shutdown();
+}
 }
 
 @Override
 public void close(long timeout, TimeUnit unit) {
-try {
-client.closeAsync().get(timeout, unit);
-} catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-

[pulsar] branch master updated (7c1f17a -> 0a91196)

2022-03-09 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 7c1f17a  Fix potential npe bug of #14384 (#14595)
 add 0a91196  [pulsar-broker] support client configurable message chunk 
size (#14382)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/client/impl/MessageChunkingTest.java | 18 +++---
 .../org/apache/pulsar/client/api/ProducerBuilder.java  | 10 ++
 .../apache/pulsar/client/impl/ProducerBuilderImpl.java |  6 ++
 .../org/apache/pulsar/client/impl/ProducerImpl.java|  8 +++-
 .../client/impl/conf/ProducerConfigurationData.java|  1 +
 site2/docs/client-libraries-java.md|  2 ++
 6 files changed, 41 insertions(+), 4 deletions(-)


[pulsar] branch master updated (e3c9684 -> df9a12d)

2022-02-16 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from e3c9684  [Doc] Move schema compatibility strategy cmd from topics to 
topicsPolicies (#14232)
 add df9a12d  [pulsar-broker] fix ack-hole and backlog for 
persistent-replicator (#14282)

No new revisions were added by this update.

Summary of changes:
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  7 ++-
 .../pulsar/broker/service/AbstractReplicator.java  |  4 ++
 .../service/persistent/PersistentReplicator.java   | 14 +
 .../pulsar/broker/service/ReplicatorTest.java  | 59 ++
 4 files changed, 83 insertions(+), 1 deletion(-)


[pulsar] branch master updated (792e264 -> edf4858)

2022-02-10 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 792e264  [pulsar-broker] Support roll-over ledgers for inactive topics 
(#13073)
 add edf4858  [pulsar-broker] support to get list of topics under a 
namespace bundle (#12632)

No new revisions were added by this update.

Summary of changes:
 .../broker/admin/impl/PersistentTopicsBase.java| 24 +---
 .../broker/admin/v1/NonPersistentTopics.java   |  7 ++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  7 ++-
 .../broker/admin/v2/NonPersistentTopics.java   |  8 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  7 ++-
 .../common/naming/NamespaceBundleFactory.java  |  7 ++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 35 +++
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  2 +-
 .../org/apache/pulsar/client/admin/Topics.java | 69 ++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 22 ++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  4 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java | 10 +++-
 .../org/apache/pulsar/admin/cli/TestCmdTopics.java |  5 ++
 13 files changed, 187 insertions(+), 20 deletions(-)


[pulsar] branch master updated: [pulsar-broker] load-balancer support disabling max-session for bundle split (#13108)

2021-12-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f7abada  [pulsar-broker] load-balancer support disabling max-session 
for bundle split (#13108)
f7abada is described below

commit f7abada1a5e1e79eee10376940187a6f7dc3cd79
Author: Rajan Dhabalia 
AuthorDate: Mon Dec 20 11:25:40 2021 -0800

[pulsar-broker] load-balancer support disabling max-session for bundle 
split (#13108)
---
 conf/broker.conf | 1 +
 .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 1 +
 .../apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java| 3 ++-
 .../apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java | 3 ++-
 .../java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java  | 5 +
 5 files changed, 11 insertions(+), 2 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 80eb86e..9b67442 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1074,6 +1074,7 @@ loadBalancerAutoUnloadSplitBundlesEnabled=true
 loadBalancerNamespaceBundleMaxTopics=1000
 
 # maximum sessions (producers + consumers) in a bundle, otherwise bundle split 
will be triggered
+# (disable threshold check with value -1)
 loadBalancerNamespaceBundleMaxSessions=1000
 
 # maximum msgRate (in + out) in a bundle, otherwise bundle split will be 
triggered
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 21c82b6..ba1db65 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1953,6 +1953,7 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 @FieldContext(
 category = CATEGORY_LOAD_BALANCER,
 doc = "maximum sessions (producers + consumers) in a bundle, otherwise 
bundle split will be triggered"
++ "(disable threshold check with value -1)"
 )
 private int loadBalancerNamespaceBundleMaxSessions = 1000;
 @FieldContext(
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index fa48618..751203c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -84,7 +84,8 @@ public class BundleSplitterTask implements 
BundleSplitStrategy {
 totalMessageRate = longTermData.totalMsgRate();
 totalMessageThroughput = longTermData.totalMsgThroughput();
 }
-if (stats.topics > maxBundleTopics || stats.consumerCount + 
stats.producerCount > maxBundleSessions
+if (stats.topics > maxBundleTopics || (maxBundleSessions > 0 
&& (stats.consumerCount
++ stats.producerCount > maxBundleSessions))
 || totalMessageRate > maxBundleMsgRate || 
totalMessageThroughput > maxBundleBandwidth) {
 final String namespace = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
 try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index e1829e6..017a040 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -1372,7 +1372,8 @@ public class SimpleLoadManagerImpl implements 
LoadManager, Consumer maxBundleTopics || totalSessions > 
maxBundleSessions || totalMsgRate > maxBundleMsgRate
+if (stats.topics > maxBundleTopics || (maxBundleSessions > 0
+&& totalSessions > maxBundleSessions) || totalMsgRate > 
maxBundleMsgRate
 || totalBandwidth > maxBundleBandwidth) {
 if (stats.topics <= 1) {
 log.info("Unable to split hot namespace bundle {} since 
there is only one topic.", bundleName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java
index 5a6b784..a64f283 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lo

[pulsar] branch master updated (dc884f8 -> f965fb8)

2021-12-01 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from dc884f8  Disable PulsarLedgerAuditorManagerTest when running on 
RocksDB backend (#13072)
 add f965fb8  [pulsar-broker] add uniform load shedder strategy to 
distribute traffic uniformly across brokers (#12902)

No new revisions were added by this update.

Summary of changes:
 conf/broker.conf   |  12 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  19 +++
 .../loadbalance/impl/UniformLoadShedder.java   | 169 +
 site2/docs/administration-load-balance.md  |  14 ++
 site2/docs/reference-configuration.md  |   2 +
 5 files changed, 216 insertions(+)
 create mode 100644 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.java


[pulsar] branch master updated (b27a716 -> 19b1d1b)

2021-11-10 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from b27a716  Some depdency in integration tests scope should be test 
(#12696)
 add 19b1d1b  [pulsar-broker] Provide option to split bundle based on load 
(#12378)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 21 +--
 .../broker/loadbalance/ModularLoadManager.java |  9 +++
 .../loadbalance/impl/ModularLoadManagerImpl.java   |  5 +-
 .../common/naming/NamespaceBundleFactory.java  | 28 -
 .../broker/namespace/NamespaceServiceTest.java | 70 +++---
 .../pulsar/common/policies/data/Policies.java  |  5 +-
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 15 -
 7 files changed, 135 insertions(+), 18 deletions(-)


[pulsar] branch master updated: [pulsar-client] Fix pending queue-size stats for batch messages (#12704)

2021-11-09 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 9a3e7ec  [pulsar-client] Fix pending queue-size stats for batch 
messages (#12704)
9a3e7ec is described below

commit 9a3e7ecb326d045a10c3438f10bda63002d4603c
Author: Rajan Dhabalia 
AuthorDate: Tue Nov 9 23:16:16 2021 -0800

[pulsar-client] Fix pending queue-size stats for batch messages (#12704)
---
 .../client/api/SimpleProducerConsumerStatTest.java   | 16 
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 10 +-
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 695b9b9..40d34a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -33,9 +33,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.gson.Gson;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -46,6 +46,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 
@@ -78,6 +79,11 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
 return new Object[][] { { 0, 0 }, { 0, 2 }, { 1000, 0 }, { 1000, 2 } };
 }
 
+@DataProvider(name = "batchingEnabled")
+public Object[][] batchingEnabled() {
+return new Object[][] { { true }, { false } };
+}
+
 @Test(dataProvider = "batch_with_timeout")
 public void testSyncProducerAndConsumer(int batchMessageDelayMs, int 
ackTimeoutSec) throws Exception {
 log.info("-- Starting {} test --", methodName);
@@ -427,14 +433,14 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
 log.info("-- Exiting {} test --", methodName);
 }
 
-@Test
-public void testProducerPendingQueueSizeStats() throws Exception {
+@Test(dataProvider =  "batchingEnabled")
+public void testProducerPendingQueueSizeStats(boolean batchingEnabled) 
throws Exception {
 log.info("-- Starting {} test --", methodName);
 ProducerBuilder producerBuilder = pulsarClient.newProducer()
 .topic("persistent://my-property/tp1/my-ns/my-topic1");
 
 @Cleanup
-Producer producer = 
producerBuilder.enableBatching(false).create();
+Producer producer = 
producerBuilder.enableBatching(batchingEnabled).create();
 
 stopBroker();
 
@@ -443,6 +449,8 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
 String message = "my-message-" + i;
 producer.sendAsync(message.getBytes());
 }
+Awaitility.await().timeout(2, TimeUnit.MINUTES)
+.until(() -> producer.getStats().getPendingQueueSize() == 
numMessages);
 assertEquals(producer.getStats().getPendingQueueSize(), numMessages);
 }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 5e0579f..40c9b99 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.client.api.BatcherBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
@@ -1924,7 +1925,14 @@ public class ProducerImpl extends ProducerBase 
implements TimerTask, Conne
 }
 
 public int getPendingQueueSize() {
-return pendingMessages.size();
+if (!isBatchMessagingEnabled()) {
+return pendingMessages.size();
+}
+MutableInt size = new MutableInt(0);
+pendingMessages.forEach(op -> {
+si

[pulsar] branch master updated: Fix flaky test: PersistentTopicsTest::testPeekWithSubscriptionNameNotExist (#12703)

2021-11-09 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new cc68f56  Fix flaky test: 
PersistentTopicsTest::testPeekWithSubscriptionNameNotExist (#12703)
cc68f56 is described below

commit cc68f56c9c32ef4b1ea8595c688cd684eb0253f6
Author: Rajan Dhabalia 
AuthorDate: Tue Nov 9 14:44:48 2021 -0800

Fix flaky test: PersistentTopicsTest::testPeekWithSubscriptionNameNotExist 
(#12703)
---
 .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java  | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 0333a0b..2cb2d57 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -77,6 +77,7 @@ import 
org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.zookeeper.KeeperException;
@@ -668,18 +669,20 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
 @Test
 public void testPeekWithSubscriptionNameNotExist() throws Exception {
 final String topicName = "testTopic";
-final String topic = TopicName.get(
+final TopicName topic = TopicName.get(
 TopicDomain.persistent.value(),
 testTenant,
 testNamespace,
-topicName).toString();
+topicName);
 final String subscriptionName = "sub";
 
-((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, 3, 
true).get();
+RetentionPolicies retention = new RetentionPolicies(10,10);
+admin.namespaces().setRetention(topic.getNamespace(), retention);
+((TopicsImpl) 
admin.topics()).createPartitionedTopicAsync(topic.toString(), 3, true).get();
 
 final String partitionedTopic = topic + "-partition-0";
 
-Producer producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+Producer producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic.toString()).create();
 for (int i = 0; i < 100; ++i) {
 producer.send("test" + i);
 }


[pulsar] branch master updated (114185e -> ffccd46)

2021-11-09 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 114185e  [website] Update team page to add new committer (#12688)
 add ffccd46  [pulsar-client] add pending-queue size metrics to producer 
stats (#12674)

No new revisions were added by this update.

Summary of changes:
 .../client/api/SimpleProducerConsumerStatTest.java  | 21 +
 .../org/apache/pulsar/client/api/ProducerStats.java |  5 +
 .../pulsar/client/impl/ProducerStatsDisabled.java   |  5 +
 .../client/impl/ProducerStatsRecorderImpl.java  |  5 +
 4 files changed, 36 insertions(+)


[pulsar] branch master updated (8992712 -> 24b0f4f)

2021-11-08 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 8992712  [website][upgrade]feat: docs migration - 2.7.1 / Admin API 
(#12676)
 add 24b0f4f  [tools] fix TestRunMain test (#12675)

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java   | 5 +
 .../src/test/java/org/apache/pulsar/admin/cli/TestRunMain.java   | 4 +++-
 2 files changed, 8 insertions(+), 1 deletion(-)


[pulsar] branch master updated (7c219b1 -> 5523604)

2021-10-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 7c219b1  Add log error tracking for semaphore count leak (#12410)
 add 5523604  [pulsar-broker] Support configuration to rate-limit 
dispatching on batch message (#12294)

No new revisions were added by this update.

Summary of changes:
 conf/broker.conf   |  4 ++
 conf/standalone.conf   |  4 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../broker/service/AbstractBaseDispatcher.java | 11 ++-
 .../PersistentDispatcherMultipleConsumers.java | 11 +--
 .../PersistentDispatcherSingleActiveConsumer.java  |  7 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 10 +--
 .../client/api/MessageDispatchThrottlingTest.java  | 78 ++
 site2/docs/reference-configuration.md  |  2 +
 9 files changed, 119 insertions(+), 14 deletions(-)


[pulsar] branch master updated (cc70a1f -> 1ce016c)

2021-10-14 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from cc70a1f  [managedledger] NPE on OpAddEntry while ManagedLedger is 
closing (#12364)
 add 1ce016c  [pulsar-broker] support split largest bundle of the namespace 
(#12361)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 14 --
 .../common/naming/NamespaceBundleFactory.java  | 49 +++--
 .../broker/namespace/NamespaceServiceTest.java | 51 ++
 .../pulsar/common/policies/data/Policies.java  |  1 +
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  3 +-
 5 files changed, 101 insertions(+), 17 deletions(-)


[pulsar] branch master updated (8b55636 -> bfa2b29)

2021-10-06 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 8b55636  [Issue-11966][pulsar-proxy] set default http proxy request 
timeout (#11971)
 add bfa2b29  [pulsar-broker] Fix: handle failed partitions topic creation 
(#10374)

No new revisions were added by this update.

Summary of changes:
 .../broker/admin/impl/PersistentTopicsBase.java| 70 --
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  3 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  3 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 52 +---
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  3 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  3 +-
 .../org/apache/pulsar/client/admin/Topics.java | 40 -
 .../pulsar/client/admin/internal/TopicsImpl.java   | 19 +-
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  2 +-
 .../org/apache/pulsar/admin/cli/CmdTopics.java |  6 +-
 10 files changed, 151 insertions(+), 50 deletions(-)


[pulsar] branch master updated (da5bac9 -> 4011aa0)

2021-10-02 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from da5bac9  Update PIP GitHub Issue Template (#12176)
 add 4011aa0  [pulsar-admin] add option to get precise backlog on v1 topic 
(#8927)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java | 6 --
 .../java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java| 5 +++--
 .../test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java  | 2 +-
 .../main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java  | 5 -
 4 files changed, 12 insertions(+), 6 deletions(-)


[pulsar] branch master updated: [pulsar-broker] handle NPE when check active consumer in stats (#12214)

2021-09-28 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 2c6807a  [pulsar-broker] handle NPE when check active consumer in 
stats (#12214)
2c6807a is described below

commit 2c6807a578d5fab0711dde5531f15b924cbb3982
Author: Rajan Dhabalia 
AuthorDate: Tue Sep 28 11:37:03 2021 -0700

[pulsar-broker] handle NPE when check active consumer in stats (#12214)
---
 .../service/persistent/PersistentDispatcherSingleActiveConsumer.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index c4bea81..3c22242 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -558,10 +558,10 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
 
 @Override
 public boolean checkAndUnblockIfStuck() {
-if (cursor.checkAndUpdateReadPositionChanged()) {
+Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
+if (consumer == null || cursor.checkAndUpdateReadPositionChanged()) {
 return false;
 }
-Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
 int totalAvailablePermits = consumer.getAvailablePermits();
 // consider dispatch is stuck if : dispatcher has backlog, 
available-permits and there is no pending read
 if (totalAvailablePermits > 0 && !havePendingRead && 
cursor.getNumberOfEntriesInBacklog(false) > 0) {


[pulsar] branch master updated (4e60de6 -> f154de7)

2021-08-23 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 4e60de6  [Issue 11632][C++] Turning on more compiler warnings, and 
enforcing warnings as errors (#11668)
 add f154de7  [pulsar-client] clean up MultiTopicsConsumerImpl reference on 
consumer creation failure (#11754)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/client/impl/MultiTopicsConsumerImpl.java   | 15 ---
 .../apache/pulsar/client/impl/UnAckedMessageTracker.java  |  1 +
 2 files changed, 13 insertions(+), 3 deletions(-)


[pulsar] branch master updated: [pulsar-client] Fix: set and return topic name on message api (#11743)

2021-08-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 0f8aef2  [pulsar-client] Fix: set and return topic name on message api 
(#11743)
0f8aef2 is described below

commit 0f8aef2b9494cde8e8adc9d97d89a65d73ae8c35
Author: Rajan Dhabalia 
AuthorDate: Sun Aug 22 13:34:34 2021 -0700

[pulsar-client] Fix: set and return topic name on message api (#11743)
---
 .../api/PartitionedProducerConsumerTest.java   | 40 ++
 .../pulsar/client/impl/ProducerSemaphoreTest.java  |  6 ++--
 .../pulsar/client/impl/MessageBuilderImpl.java |  2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  5 +--
 .../client/impl/TypedMessageBuilderImpl.java   |  2 +-
 .../apache/pulsar/client/impl/MessageImplTest.java | 30 
 .../org/apache/pulsar/client/impl/MessageTest.java |  8 ++---
 .../client/impl/schema/AutoConsumeSchemaTest.java  |  2 +-
 8 files changed, 68 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 3e9b194..543c172 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -903,6 +903,46 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
 log.info("-- Exiting {} test --", methodName);
 }
 
+@Test
+public void testCustomPartitionedProducer() throws Exception {
+PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+TopicName topicName = null;
+Producer producer = null;
+try {
+log.info("-- Starting {} test --", methodName);
+
+int numPartitions = 4;
+topicName = TopicName
+
.get("persistent://my-property/my-ns/my-partitionedtopic1-" + 
System.currentTimeMillis());
+
+admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+RouterWithTopicName router = new RouterWithTopicName();
+producer = pulsarClient.newProducer().topic(topicName.toString())
+.messageRouter(router)
+.create();
+for (int i = 0; i < 1; i++) {
+String message = "my-message-" + i;
+
producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
+}
+assertEquals(router.topicName, topicName.toString());
+} finally {
+producer.close();
+pulsarClient.close();
+admin.topics().deletePartitionedTopic(topicName.toString());
+log.info("-- Exiting {} test --", methodName);
+}
+}
+
+private static class RouterWithTopicName implements MessageRouter {
+static String topicName = null;
+
+@Override
+public int choosePartition(Message msg, TopicMetadata metadata) {
+topicName = msg.getTopicName();
+return 2;
+}
+}
 
 private static class AlwaysTwoMessageRouter implements MessageRouter {
 @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index f46509e..c719cbd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -88,7 +88,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 for (int i = 0; i < messages / 2; i++) {
 MessageMetadata metadata = new MessageMetadata()
 .setNumMessagesInBatch(10);
-MessageImpl msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES);
+MessageImpl msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
 futures.add(producer.sendAsync(msg));
 }
 
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages/2);
@@ -147,7 +147,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
 MessageMetadata metadata = new MessageMetadata()
 .setNumMessagesInBatch(10);
 
-MessageImpl msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES);
+MessageImpl msg = MessageImpl.create(met

[pulsar] branch master updated: PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725)

2021-08-21 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d8e6b7  PIP 83 : Pulsar Reader: Message consumption with pooled 
buffer (#11725)
8d8e6b7 is described below

commit 8d8e6b751ee0dc99306a1c61c22a8d75b5927811
Author: Rajan Dhabalia 
AuthorDate: Sat Aug 21 00:50:17 2021 -0700

PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725)

* PIP 83 : Pulsar Reader: Message consumption with pooled buffer
---
 .../client/impl/BrokerClientIntegrationTest.java   | 56 ++
 .../apache/pulsar/client/api/ReaderBuilder.java| 10 
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  1 +
 .../pulsar/client/impl/ReaderBuilderImpl.java  |  7 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  1 +
 .../client/impl/conf/ReaderConfigurationData.java  |  2 +
 6 files changed, 76 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index fb3c30b..a111dd8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -953,4 +954,59 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
 consumer.close();
 producer.close();
 }
+
+/**
+ * It validates pooled message consumption for batch and non-batch 
messages.
+ * 
+ * @throws Exception
+ */
+@Test(dataProvider = "booleanFlagProvider")
+public void testConsumerWithPooledMessagesWithReader(boolean 
isBatchingEnabled) throws Exception {
+log.info("-- Starting {} test --", methodName);
+
+@Cleanup
+PulsarClient newPulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+final String topic = 
"persistent://my-property/my-ns/testConsumerWithPooledMessages" + 
isBatchingEnabled;
+
+@Cleanup
+Reader reader = 
newPulsarClient.newReader(Schema.BYTEBUFFER).topic(topic).poolMessages(true)
+.startMessageId(MessageId.latest).create();
+
+@Cleanup
+Producer producer = 
newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
+
+final int numMessages = 100;
+for (int i = 0; i < numMessages; i++) {
+producer.newMessage().value(("value-" + i).getBytes(UTF_8))
+.eventTime((i + 1) * 100L).sendAsync();
+}
+producer.flush();
+
+// Reuse pre-allocated pooled buffer to process every message
+byte[] val = null;
+int size = 0;
+for (int i = 0; i < numMessages; i++) {
+Message msg = reader.readNext();
+ByteBuffer value;
+try {
+value = msg.getValue();
+int capacity = value.remaining();
+// expand the size of buffer if needed
+if (capacity > size) {
+val = new byte[capacity];
+size = capacity;
+}
+// read message into pooled buffer
+value.get(val, 0, capacity);
+// process the message
+assertEquals(("value-" + i), new String(val, 0, capacity));
+assertTrue(value.isDirect());
+} finally {
+msg.release();
+}
+}
+reader.close();
+producer.close();
+}
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index a84208b..4186df7 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -280,4 +280,14 @@ public interface ReaderBuilder extends Cloneable {
  * @return the reader builder instance
  */
 ReaderBuilder keyHashRange(Range... ranges);
+
+/**
+ * Enable pooling of messages and the underlying data buffers.
+ * 
+ * When pooling is enabled, the application is responsible

[pulsar] branch master updated: [server] Allow broker to start with default backlogquota in bytes (#11671)

2021-08-17 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 80171a7  [server] Allow broker to start with default backlogquota in 
bytes (#11671)
80171a7 is described below

commit 80171a733ab4799f912a8935f03c19554b9ca3b1
Author: Rajan Dhabalia 
AuthorDate: Mon Aug 16 23:24:02 2021 -0700

[server] Allow broker to start with default backlogquota in bytes (#11671)

* [server] Allow broker to start with default backlogquota in bytes

* remove sysout from test

* fix test
---
 conf/broker.conf   |  5 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  9 -
 .../pulsar/broker/service/BacklogQuotaManager.java |  5 ++-
 .../org/apache/pulsar/broker/ConfigHelper.java |  5 ++-
 .../broker/service/BacklogQuotaManagerTest.java| 46 ++
 .../common/naming/ServiceConfigurationTest.java|  8 ++--
 .../policies/data/impl/BacklogQuotaImpl.java   |  2 +-
 site2/docs/reference-configuration.md  |  4 +-
 8 files changed, 73 insertions(+), 11 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index e4c15c7..404d8d7 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -125,9 +125,12 @@ backlogQuotaCheckEnabled=true
 # How often to check for topics that have reached the quota
 backlogQuotaCheckIntervalInSeconds=60
 
-# Default per-topic backlog quota limit, less than 0 means no limitation. 
default is -1.
+# Deprecated - Use backlogQuotaDefaultLimitByte instead.
 backlogQuotaDefaultLimitGB=-1
 
+# Default per-topic backlog quota limit, less than 0 means no limitation. 
default is -1.
+backlogQuotaDefaultLimitBytes=-1
+
 # Default per-topic backlog quota time limit in second, less than 0 means no 
limitation. default is -1.
 backlogQuotaDefaultLimitSecond=-1
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index aaec952..e98a5ce 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -334,12 +334,19 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 )
 private int backlogQuotaCheckIntervalInSeconds = 60;
 
+@Deprecated
+@FieldContext(
+category = CATEGORY_POLICIES,
+doc = "@deprecated - Use backlogQuotaDefaultLimitByte instead.\""
+)
+private double backlogQuotaDefaultLimitGB = -1;
+
 @FieldContext(
 category = CATEGORY_POLICIES,
 doc = "Default per-topic backlog quota limit by size, less than 0 
means no limitation. default is -1."
 + " Increase it if you want to allow larger msg backlog"
 )
-private long backlogQuotaDefaultLimitGB = -1;
+private long backlogQuotaDefaultLimitBytes = -1;
 
 @FieldContext(
 category = CATEGORY_POLICIES,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index bbb3ddf..f9ccb74 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -53,9 +53,10 @@ public class BacklogQuotaManager {
 
 public BacklogQuotaManager(PulsarService pulsar) {
 this.isTopicLevelPoliciesEnable = 
pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
+double backlogQuotaGB = 
pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB();
 this.defaultQuota = BacklogQuotaImpl.builder()
-
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB()
-* BacklogQuotaImpl.BYTES_IN_GIGABYTE)
+.limitSize(backlogQuotaGB > 0 ? (long) backlogQuotaGB * 
BacklogQuotaImpl.BYTES_IN_GIGABYTE
+: 
pulsar.getConfiguration().getBacklogQuotaDefaultLimitBytes())
 
.limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond())
 
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
 .build();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
index b929636..ca8231a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java
@@ -38,8 +38,11 @@ public class ConfigHelper {
 }
 
 public static BacklogQuota

[pulsar-adapters] branch master updated: [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26)

2021-07-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-adapters.git


The following commit(s) were added to refs/heads/master by this push:
 new feaa450  [pulsar-kafka] Support encryption for pulsar-kafka 
producer/consumer (#26)
feaa450 is described below

commit feaa4504d567a5310e0a3d54003e721696491993
Author: Rajan Dhabalia 
AuthorDate: Tue Jul 20 13:05:37 2021 -0700

[pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26)

* [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer

* add properties param for getEncryptionKey method
---
 .../clients/producer/PulsarKafkaProducer.java  |   2 +-
 .../kafka/compat/CryptoKeyReaderFactory.java   |  51 ++
 .../kafka/compat/PulsarConsumerKafkaConfig.java|  12 +++
 .../kafka/compat/PulsarProducerKafkaConfig.java|  17 
 .../producer/PulsarCliebtKafkaConfigTest.java  | 111 +
 .../clients/producer/PulsarKafkaProducerTest.java  |   1 -
 6 files changed, 192 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index f6da38e..fca5da8 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -65,7 +65,7 @@ import org.slf4j.LoggerFactory;
 public class PulsarKafkaProducer implements Producer {
 
 private final PulsarClient client;
-private final ProducerBuilder pulsarProducerBuilder;
+final ProducerBuilder pulsarProducerBuilder;
 
 private final ConcurrentMap> producers = new 
ConcurrentHashMap<>();
 
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java
new file mode 100644
index 000..649c297
--- /dev/null
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/CryptoKeyReaderFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.pulsar.client.api.CryptoKeyReader;
+
+/**
+ * Factory class to create {@link CryptoKeyReader} by using configuration 
stored in Producer/Consumer properties.
+ *
+ */
+public interface CryptoKeyReaderFactory {
+
+/**
+ * Create CryptoKeyReader object for Producer/Consumer.
+ * 
+ * @param properties
+ *properties provided by user to create CryptoKeyReader based 
on configuration params
+ * @return CryptoKeyReader
+ */
+CryptoKeyReader create(Properties properties);
+
+/**
+ * Encryption keys for {@link CryptoKeyReader} while enabling encryption 
at producer.
+ *
+ *  @param properties
+ *properties provided by user to get encryption-keys based on 
configuration params
+ * @return Set of encryption keys
+ */
+default Set getEncryptionKey(Properties properties) {
+return null;
+}
+}
diff --git 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index 09a9806..40858b1 100644
--- 
a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ 
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -22,6 +22,7 @@ imp

[pulsar] branch master updated (3fcfb22 -> eb4d8aa)

2021-07-16 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 3fcfb22  [Transaction] Fix the transaction marker does not deleted as 
expect. (#11126)
 add eb4d8aa  Support new topic format for broker admin healthcheck 
endpoint. (#11268)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/admin/impl/BrokersBase.java  | 83 ++
 .../pulsar/broker/namespace/NamespaceService.java  | 35 -
 .../apache/pulsar/broker/SLAMonitoringTest.java|  2 +-
 .../broker/admin/AdminApiHealthCheckTest.java  | 83 ++
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  4 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java|  4 +-
 .../org/apache/pulsar/client/admin/Brokers.java| 15 
 .../apache/pulsar/common/naming/TopicVersion.java  |  5 ++
 .../pulsar/client/admin/internal/BrokersImpl.java  | 20 +-
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  4 +-
 .../org/apache/pulsar/admin/cli/CmdBrokers.java|  6 +-
 11 files changed, 220 insertions(+), 41 deletions(-)
 create mode 100644 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
 copy 
pulsar-broker/src/main/java/org/apache/pulsar/common/naming/package-info.java 
=> 
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/naming/TopicVersion.java
 (94%)


[pulsar] branch master updated (2f47c32 -> 3550f2e)

2021-05-11 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 2f47c32  Allow to disable producer max queue size (#9650)
 add 3550f2e  [pulsar-broker] Dispatch messaages to consumer with permits 
(#10417)

No new revisions were added by this update.

Summary of changes:
 .../PersistentDispatcherMultipleConsumers.java | 25 +++---
 .../pulsar/client/api/ConsumerRedeliveryTest.java  | 58 ++
 .../apache/pulsar/client/impl/ConsumerBase.java|  4 ++
 3 files changed, 81 insertions(+), 6 deletions(-)


[pulsar] branch master updated (345cd33 -> 61639a2)

2021-04-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 345cd33  [Kinesis]Fix kinesis sink can not retry to send messages 
(#10420)
 add 61639a2  [pulsar-broker] Fix: Topic loading fails without any error 
when replicator init fails (#10432)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/service/persistent/PersistentTopic.java  | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)


[pulsar] branch master updated: PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)

2021-04-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new ef06691  PIP 83 : Pulsar client: Message consumption with pooled 
buffer (#10184)
ef06691 is described below

commit ef06691531002c5d7cdbbdafc5494914ee8e0765
Author: Rajan Dhabalia 
AuthorDate: Tue Apr 20 16:26:28 2021 -0700

PIP 83 : Pulsar client: Message consumption with pooled buffer (#10184)

fix api, buffer-access, duplicate code
---
 .../client/api/SimpleProducerConsumerTest.java |   2 +-
 .../client/impl/BrokerClientIntegrationTest.java   | 107 -
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  10 +
 .../java/org/apache/pulsar/client/api/Message.java |  15 ++
 .../java/org/apache/pulsar/client/api/Schema.java  |  17 ++
 .../client/internal/DefaultImplementation.java |  20 ++
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  34 ++-
 .../apache/pulsar/client/impl/ConsumerBase.java|  13 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java|   5 +
 .../apache/pulsar/client/impl/ConsumerImpl.java|  25 +-
 .../client/impl/ConsumerStatsRecorderImpl.java |   2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java | 256 ++---
 .../apache/pulsar/client/impl/MessagesImpl.java|   4 +-
 .../client/impl/MultiTopicsConsumerImpl.java   |   7 +-
 .../pulsar/client/impl/TopicMessageImpl.java   |  10 +
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   1 +
 .../impl/conf/ConsumerConfigurationData.java   |   2 +
 .../pulsar/client/impl/schema/AbstractSchema.java  |   3 +-
 .../client/impl/schema/ByteBufferSchema.java   |  13 +-
 .../pulsar/client/impl/schema/BytesSchema.java |   2 +-
 .../client/impl/schema/LocalDateTimeSchema.java|   4 +-
 .../pulsar/client/impl/schema/StringSchema.java|   2 +-
 .../pulsar/testclient/PerformanceConsumer.java |  25 +-
 23 files changed, 445 insertions(+), 134 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index abcb7ed..1123562 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -4029,4 +4029,4 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 blockedMessageLatch.countDown();
 log.info("-- Exiting {} test --", methodName);
 }
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index c4d03ad..f7ce13b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.UUID.randomUUID;
 import static 
org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
 import static org.mockito.Mockito.any;
@@ -30,12 +31,14 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import io.netty.buffer.ByteBuf;
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
@@ -55,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.Cleanup;
-
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
@@ -132,6 +134,11 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
 return new Object[][] { { SubscriptionType.Shared }, { 
SubscriptionType.Failover } };
 }
 
+@DataProvider(name = "booleanFlagProvider")
+public Object[][] booleanFlagProvider() {
+return new Object[][] { { true }, { false } };
+}
+
 /**
  * Verifies unload namespace-bundle doesn't close shared connection used 
by other namespace-bundle.
  *
@@ -918,4 +925,98 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
 private static final class TestMessageObject{
 

[pulsar] branch master updated (4c434ad -> 4ac5469)

2021-04-14 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 4c434ad  Add current ip address, long hostname and short hostname to 
/etc/hosts (#10233)
 add 4ac5469  Add Enrico Olivelli as committer in the team page (#10234)

No new revisions were added by this update.

Summary of changes:
 site2/website/data/team.js | 5 +
 1 file changed, 5 insertions(+)


[pulsar] branch master updated (4673963 -> a004dfe)

2021-03-25 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 4673963  [pulsar-auth] Allow serializable stream-provider field into 
AuthenticationTls (#10020)
 add a004dfe  [pulsar-websocket] Allow websocket to consume and pass 
message to client without decryption (#10026)

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/pulsar/websocket/ConsumerHandler.java  | 14 ++
 .../java/org/apache/pulsar/websocket/ReaderHandler.java|  9 +
 2 files changed, 19 insertions(+), 4 deletions(-)


[pulsar] branch master updated (77c7e9c -> 4673963)

2021-03-25 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 77c7e9c  [Docs][Geo Replication] Copy PR 9857 to older versioned docs 
(#10029)
 add 4673963  [pulsar-auth] Allow serializable stream-provider field into 
AuthenticationTls (#10020)

No new revisions were added by this update.

Summary of changes:
 distribution/server/licenses/LICENSE-Spotbugs.txt  | 502 +
 distribution/server/src/assemble/NOTICE.bin.txt|   3 +
 pom.xml|   8 +
 pulsar-client/pom.xml  |   7 +
 .../pulsar/client/impl/auth/AuthenticationTls.java |  19 +-
 .../client/impl/auth/AuthenticationTlsTest.java|  94 
 6 files changed, 632 insertions(+), 1 deletion(-)
 create mode 100644 distribution/server/licenses/LICENSE-Spotbugs.txt
 create mode 100644 
pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/AuthenticationTlsTest.java


[pulsar] branch master updated (058236c -> dd32435)

2021-03-19 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 058236c  Upgrade to Apache Avro 1.10.2 (#9898)
 add dd32435  [pulsar-discovery] Replace MetadataStore with ZooKeeper in 
discoveryservice (#9967)

No new revisions were added by this update.

Summary of changes:
 ...ources.java => LoadManagerReportResources.java} |   8 +-
 .../pulsar/broker/resources/PulsarResources.java   |   2 +
 .../pulsar/client/api/BrokerServiceLookupTest.java |  43 ++---
 .../service/web/DiscoveryServiceWebTest.java   |  47 +++--
 .../discovery/service/BrokerDiscoveryProvider.java |  21 ++-
 .../pulsar/discovery/service/DiscoveryService.java |  17 +-
 .../pulsar/discovery/service/ServerConnection.java |   6 +-
 .../service/web/DiscoveryServiceServlet.java   |  40 +++--
 .../service/web/MetadataStoreCacheLoader.java  | 100 +--
 .../service/web/ZookeeperCacheLoader.java  | 190 -
 .../discovery/service/BaseDiscoveryTestSetup.java  |  40 ++---
 .../discovery/service/DiscoveryServiceTest.java|  29 +---
 .../discovery/service/web/BaseZKStarterTest.java   |  33 ++--
 .../service/web/DiscoveryServiceWebTest.java   | 120 +++--
 ...Test.java => MetadataStoreCacheLoaderTest.java} |  23 ++-
 15 files changed, 238 insertions(+), 481 deletions(-)
 copy 
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/{LocalPoliciesResources.java
 => LoadManagerReportResources.java} (73%)
 copy 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 => 
pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/MetadataStoreCacheLoader.java
 (51%)
 delete mode 100644 
pulsar-discovery-service/src/main/java/org/apache/pulsar/discovery/service/web/ZookeeperCacheLoader.java
 rename 
pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/web/{ZookeeperCacheLoaderTest.java
 => MetadataStoreCacheLoaderTest.java} (83%)


[pulsar] branch master updated (1fab5aa -> 9a6ae06)

2021-02-20 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 1fab5aa  [pulsar-broker] Authorization service uses metadata-store api 
(#9586)
 add 9a6ae06  [pulsar-broker] Remove global-zk reference from 
Pulsar-service (#9648)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/pulsar/broker/PulsarService.java|  8 +--
 .../apache/pulsar/broker/admin/AdminResource.java  | 62 +++---
 .../pulsar/broker/namespace/NamespaceService.java  |  3 +-
 .../pulsar/broker/service/BrokerService.java   | 31 ---
 .../broker/service/persistent/PersistentTopic.java | 23 
 .../broker/admin/AdminApiGetLastMessageIdTest.java |  1 -
 .../apache/pulsar/broker/admin/AdminApiTest2.java  |  2 -
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  2 -
 .../apache/pulsar/broker/admin/NamespacesTest.java |  1 -
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  3 --
 .../pulsar/broker/admin/v1/V1_AdminApiTest2.java   |  2 -
 .../broker/loadbalance/LoadBalancerTest.java   | 25 +
 .../loadbalance/SimpleLoadManagerImplTest.java | 18 ---
 .../apache/pulsar/broker/web/WebServiceTest.java   |  3 +-
 .../api/AuthenticatedProducerConsumerTest.java |  2 -
 .../pulsar/client/api/PartitionCreationTest.java   |  7 +--
 .../functions/worker/PulsarWorkerService.java  | 39 +-
 .../pulsar/functions/worker/WorkerService.java |  6 +--
 .../service/WorkerServiceWithClassLoader.java  |  5 +-
 19 files changed, 78 insertions(+), 165 deletions(-)



[pulsar] branch master updated: [pulsar-broker] topic resources use metadata-store api (#9485)

2021-02-18 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 0f9e211  [pulsar-broker] topic resources use metadata-store api (#9485)
0f9e211 is described below

commit 0f9e211a85d0e5db4b7f96a9f8e402ad814c552d
Author: Rajan Dhabalia 
AuthorDate: Thu Feb 18 10:35:41 2021 -0800

[pulsar-broker] topic resources use metadata-store api (#9485)

* [pulsar-broker] topics resources use metadata-store api

[pulsar-broker] MockZK: Handle zk-children watch notification

fix test

fix sync function initialization

fix tests

fix zk-create

add timeout

* address comments
---
 .../org/apache/pulsar/broker/PulsarService.java|   3 +-
 .../apache/pulsar/broker/admin/AdminResource.java  | 126 ++
 .../pulsar/broker/admin/impl/BaseResources.java|  22 ++-
 .../pulsar/broker/admin/impl/ClusterResources.java |  11 +-
 .../admin/impl/DynamicConfigurationResources.java  |   5 +-
 .../broker/admin/impl/NamespaceResources.java  |  30 +++-
 .../broker/admin/impl/PersistentTopicsBase.java| 189 -
 .../pulsar/broker/admin/impl/PulsarResources.java  |  12 +-
 .../pulsar/broker/admin/impl/TenantResources.java  |   4 +-
 .../pulsar/broker/service/BrokerService.java   |  13 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   4 +
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java|   2 +-
 .../client/impl/BrokerClientIntegrationTest.java   |  14 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |   6 +-
 .../pulsar/client/cli/PulsarClientToolTest.java|  22 +--
 .../runtime/thread/ThreadRuntimeFactory.java   |   1 +
 .../cache/impl/JSONMetadataSerdeSimpleType.java|   2 -
 .../metadata/impl/AbstractMetadataStore.java   |   2 +
 .../metadata/impl/LocalMemoryMetadataStore.java|   1 +
 .../pulsar/metadata/impl/ZKMetadataStore.java  |   3 +
 .../apache/pulsar/metadata/MetadataStoreTest.java  |   1 +
 .../pulsar/zookeeper/LocalBookkeeperEnsemble.java  |   6 +-
 22 files changed, 221 insertions(+), 258 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aedd965..2f0e712 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -485,7 +485,8 @@ public class PulsarService implements AutoCloseable {
 coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
 
 configurationMetadataStore = createConfigurationMetadataStore();
-pulsarResources = new PulsarResources(localMetadataStore, 
configurationMetadataStore);
+pulsarResources = new PulsarResources(localMetadataStore, 
configurationMetadataStore,
+config.getZooKeeperOperationTimeoutSeconds());
 
 orderedExecutor = OrderedExecutor.newBuilder()
 .numThreads(config.getNumOrderedExecutorThreads())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1228351..59af6ca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -71,6 +71,9 @@ import 
org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyExistsException;
+import 
org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
@@ -81,7 +84,6 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
-import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,14 +135,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
 CreateMode.PERSISTENT, callback, null);
 }
 
-protected boolean zkPathExists(String path) throws KeeperException, 
InterruptedException {
-Stat stat = globalZk().exists(path, false);
-if (null != stat) {
-return true;
-}
-return false;
-}
-
 protected void

[pulsar] branch master updated (8d0c36e -> fcb2bb6)

2021-02-10 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 8d0c36e  [C++] Removed usages of boost::regex (#9533)
 add fcb2bb6  Issue 9279: Pulsar Admin: add command to list bookies (#9283)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |   5 +
 .../org/apache/pulsar/broker/admin/v2/Bookies.java |  30 ++
 .../apache/pulsar/broker/admin/BookiesApiTest.java |   6 ++
 .../org/apache/pulsar/client/admin/Bookies.java|  11 ++
 .../pulsar/client/admin/internal/BookiesImpl.java  |  34 ++
 .../pulsar/admin/cli/PulsarAdminToolTest.java  |  29 ++---
 .../org/apache/pulsar/admin/cli/CmdBookies.java|  10 ++
 .../{BookieInfo.java => BookiesClusterInfo.java}   |  17 ++-
 .../data/{BookieInfo.java => RawBookieInfo.java}   |   7 +-
 .../bookkeeper/client/PulsarMockBookKeeper.java| 117 +++--
 .../bookkeeper/client/PulsarMockLedgerHandle.java  |   5 +-
 11 files changed, 227 insertions(+), 44 deletions(-)
 copy 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/{BookieInfo.java
 => BookiesClusterInfo.java} (82%)
 copy 
pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/{BookieInfo.java
 => RawBookieInfo.java} (91%)



[pulsar] branch master updated (efb2089 -> 7fd3218)

2021-02-08 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from efb2089  [Issue9507][testclient] add --batch-index-ack for the 
pulsar-perf (#9521)
 add 7fd3218  [pulsar-broker] namespace resources use metadata-store api 
(#9351)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/admin/AdminResource.java  |   38 +-
 .../pulsar/broker/admin/impl/BaseResources.java|2 +-
 .../broker/admin/impl/NamespaceResources.java  |   17 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 1380 +---
 .../pulsar/broker/admin/impl/PulsarResources.java  |2 +-
 .../pulsar/broker/web/PulsarWebResource.java   |   76 ++
 .../apache/pulsar/broker/admin/NamespacesTest.java |   21 +-
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java|7 +
 .../apache/pulsar/metadata/MetadataCacheTest.java  |   35 +-
 9 files changed, 453 insertions(+), 1125 deletions(-)



[pulsar] branch master updated: [pulsar-broker] Fix: handle topic loading failure due to broken schema ledger (#9212)

2021-02-06 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d5d6f6  [pulsar-broker] Fix: handle topic loading failure due to 
broken schema ledger (#9212)
3d5d6f6 is described below

commit 3d5d6f6a1681cf842a2e3334d412fa449c553fd0
Author: Rajan Dhabalia 
AuthorDate: Sat Feb 6 20:58:32 2021 -0800

[pulsar-broker] Fix: handle topic loading failure due to broken schema 
ledger (#9212)

add more error log

fix list assignment
---
 .../service/schema/BookkeeperSchemaStorage.java| 52 ++--
 .../schema/DefaultSchemaRegistryService.java   |  5 ++
 .../broker/service/schema/SchemaRegistry.java  |  2 +
 .../service/schema/SchemaRegistryServiceImpl.java  | 71 ++
 .../service/schema/exceptions/SchemaException.java | 10 +++
 ...hemaRegistryServiceWithSchemaDataValidator.java |  7 ++-
 .../broker/service/schema/ClientGetSchemaTest.java | 67 
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 16 +++--
 .../common/protocol/schema/SchemaStorage.java  |  2 +
 9 files changed, 206 insertions(+), 26 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 475e6f9..3c29da8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -37,7 +37,9 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import javax.validation.constraints.NotNull;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -47,6 +49,7 @@ import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
 import org.apache.pulsar.common.protocol.schema.SchemaStorage;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.protocol.schema.StoredSchema;
@@ -129,7 +132,7 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 @Override
 public CompletableFuture>> 
getAll(String key) {
 CompletableFuture>> result = new 
CompletableFuture<>();
-getSchemaLocator(getSchemaPath(key)).thenAccept(locator -> {
+getLocator(key).thenAccept(locator -> {
 if (log.isDebugEnabled()) {
 log.debug("[{}] Get all schemas - locator: {}", key, locator);
 }
@@ -154,9 +157,42 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 return result;
 }
 
+private CompletableFuture> getLocator(String key) {
+return getSchemaLocator(getSchemaPath(key));
+}
+
+public void clearLocatorCache(String key) {
+localZkCache.invalidate(getSchemaPath(key));
+}
+
+@VisibleForTesting
+List getSchemaLedgerList(String key) throws IOException {
+Optional locatorEntry = null;
+try {
+locatorEntry = getLocator(key).get();
+} catch (Exception e) {
+log.warn("Failed to get list of schema-storage ledger for {}", key,
+(e instanceof ExecutionException ? e.getCause() : e));
+throw new IOException("Failed to get schema ledger for" + key);
+}
+LocatorEntry entry = locatorEntry.orElse(null);
+return entry != null ? entry.locator.getIndexList().stream().map(i -> 
i.getPosition().getLedgerId())
+.collect(Collectors.toList()) : null;
+}
+
+@VisibleForTesting
+BookKeeper getBookKeeper() {
+return bookKeeper;
+}
+
+@Override
+public CompletableFuture delete(String key, boolean 
forcefully) {
+return deleteSchema(key, forcefully).thenApply(LongSchemaVersion::new);
+}
+
 @Override
 public CompletableFuture delete(String key) {
-return deleteSchema(key).thenApply(LongSchemaVersion::new);
+return delete(key, false);
 }
 
 @NotNull
@@ -350,9 +386,10 @@ public class BookkeeperSchemaStorage implements 
SchemaStorage {
 }
 
 @NotNull
-private CompletableFuture deleteSchema(String schemaId) {
-return getSchema(schemaId).thenCompose(sc

[pulsar] branch master updated (683ee5f -> 19dec2c)

2021-02-05 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 683ee5f  PIP-45: Added session events to metadata store (#9273)
 add 19dec2c  [pulsar-broker] MockZK: Handle zk-children watch notification 
(#9473)

No new revisions were added by this update.

Summary of changes:
 .../bookkeeper/mledger/impl/MetaStoreImplTest.java | 44 +-
 .../java/org/apache/zookeeper/MockZooKeeper.java   |  2 +
 2 files changed, 44 insertions(+), 2 deletions(-)



[pulsar] branch master updated: [pulsar-broker] broker resources use metadata-store api (#9346)

2021-01-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new fa66a24  [pulsar-broker] broker resources use metadata-store api 
(#9346)
fa66a24 is described below

commit fa66a2410e3266ad5b01f00f96504e84fe6084cd
Author: Rajan Dhabalia 
AuthorDate: Fri Jan 29 19:03:36 2021 -0800

[pulsar-broker] broker resources use metadata-store api (#9346)

* [pulsar-broker] broker resources use metadata-store api

fix test

* fix api
---
 .../org/apache/pulsar/broker/PulsarService.java|  2 +-
 .../apache/pulsar/broker/admin/AdminResource.java  | 27 --
 .../pulsar/broker/admin/impl/BaseResources.java| 25 +
 .../pulsar/broker/admin/impl/BrokersBase.java  | 61 ++
 .../pulsar/broker/admin/impl/ClustersBase.java |  8 +--
 ...ces.java => DynamicConfigurationResources.java} | 18 +++
 .../pulsar/broker/admin/impl/PulsarResources.java  |  4 +-
 .../pulsar/broker/web/PulsarWebResource.java   | 29 ++
 .../org/apache/pulsar/broker/admin/AdminTest.java  |  8 ---
 9 files changed, 77 insertions(+), 105 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3e03297..fc8f309 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -484,7 +484,7 @@ public class PulsarService implements AutoCloseable {
 coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
 
 configurationMetadataStore = createConfigurationMetadataStore();
-pulsarResources = new PulsarResources(configurationMetadataStore);
+pulsarResources = new PulsarResources(localMetadataStore, 
configurationMetadataStore);
 
 orderedExecutor = OrderedExecutor.newBuilder()
 .numThreads(config.getNumOrderedExecutorThreads())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1728c88..453c39b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -23,8 +23,6 @@ import static org.apache.pulsar.common.util.Codec.decode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.errorprone.annotations.CanIgnoreReturnValue;
-import java.net.MalformedURLException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -39,7 +37,6 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.UriBuilder;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -417,30 +414,6 @@ public abstract class AdminResource extends 
PulsarWebResource {
 }
 }
 
-/**
- * Redirect the call to the specified broker.
- *
- * @param broker
- *Broker name
- * @throws MalformedURLException
- * In case the redirect happens
- */
-protected void validateBrokerName(String broker) throws 
MalformedURLException {
-String brokerUrl = String.format("http://%s;, broker);
-String brokerUrlTls = String.format("https://%s;, broker);
-if (!brokerUrl.equals(pulsar().getSafeWebServiceAddress())
-&& !brokerUrlTls.equals(pulsar().getWebServiceAddressTls())) {
-String[] parts = broker.split(":");
-checkArgument(parts.length == 2, String.format("Invalid broker url 
%s", broker));
-String host = parts[0];
-int port = Integer.parseInt(parts[1]);
-
-URI redirect = 
UriBuilder.fromUri(uri.getRequestUri()).host(host).port(port).build();
-log.debug("[{}] Redirecting the rest call to {}: broker={}", 
clientAppId(), redirect, broker);
-throw new 
WebApplicationException(Response.temporaryRedirect(redirect).build());
-}
-}
-
 protected Policies getNamespacePolicies(NamespaceName namespaceName) {
 try {
 final String namespace = namespaceName.toString();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BaseResources.java
index 07cd9c4..dfbad73 100644
--- 
a/pulsar-

[pulsar] branch master updated: [pulsar-broker] cluster resources use metadata-store api (#9338)

2021-01-28 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 0edcaa0  [pulsar-broker] cluster resources use metadata-store api 
(#9338)
0edcaa0 is described below

commit 0edcaa09150521a2a7e189de43d004ed799db2ee
Author: Rajan Dhabalia 
AuthorDate: Thu Jan 28 12:20:10 2021 -0800

[pulsar-broker] cluster resources use metadata-store api (#9338)

* [pulsar-broker] Make tenant rest-api async and use metadata-store api

fix tests

fix intermittent test failure

* [pulsar-broker] cluster resources use metadata-store api

* fix test
---
 .../org/apache/pulsar/broker/PulsarService.java|  18 +-
 .../apache/pulsar/broker/admin/AdminResource.java  |   6 +-
 .../pulsar/broker/admin/impl/BaseResources.java| 152 ++
 .../pulsar/broker/admin/impl/ClusterResources.java |  51 
 .../pulsar/broker/admin/impl/ClustersBase.java | 204 +-
 .../broker/admin/impl/NamespaceResources.java  |  51 
 .../pulsar/broker/admin/impl/PulsarResources.java  |  37 +++
 .../pulsar/broker/admin/impl/TenantResources.java  |  28 ++
 .../pulsar/broker/admin/impl/TenantsBase.java  | 307 -
 .../pulsar/broker/web/PulsarWebResource.java   | 157 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  | 238 +++-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   1 +
 .../OwnerShipForCurrentServerTestBase.java |   2 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |   8 +-
 .../broker/transaction/TransactionTestBase.java|   2 +-
 .../apache/pulsar/broker/web/WebServiceTest.java   |  23 +-
 .../metadata/api/MetadataStoreException.java   |   4 +
 .../metadata/cache/impl/MetadataCacheImpl.java |  10 +-
 .../metadata/impl/AbstractMetadataStore.java   |   7 +
 .../MockedZooKeeperClientFactoryImpl.java  |   2 +-
 21 files changed, 958 insertions(+), 358 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index a017d28..3e03297 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -69,6 +69,7 @@ import 
org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.ZookeeperSessionExpiredHandlers;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.PulsarResources;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -220,6 +221,8 @@ public class PulsarService implements AutoCloseable {
 private MetadataStoreExtended localMetadataStore;
 private CoordinationService coordinationService;
 
+private MetadataStoreExtended configurationMetadataStore;
+private PulsarResources pulsarResources;
 
 public enum State {
 Init, Started, Closed
@@ -280,6 +283,14 @@ public class PulsarService implements AutoCloseable {
 new DefaultThreadFactory("zk-cache-callback"));
 }
 
+public MetadataStoreExtended createConfigurationMetadataStore() throws 
MetadataStoreException {
+return 
MetadataStoreExtended.create(config.getConfigurationStoreServers(),
+MetadataStoreConfig.builder()
+.sessionTimeoutMillis((int) 
config.getZooKeeperSessionTimeoutMillis())
+.allowReadOnlyOperations(false)
+.build());
+}
+
 /**
  * Close the current pulsar service. All resources are released.
  */
@@ -396,6 +407,9 @@ public class PulsarService implements AutoCloseable {
 if (localMetadataStore != null) {
 localMetadataStore.close();
 }
+if (configurationMetadataStore != null) {
+configurationMetadataStore.close();
+}
 
 state = State.Closed;
 isClosedCondition.signalAll();
@@ -467,9 +481,11 @@ public class PulsarService implements AutoCloseable {
 }
 
 localMetadataStore = createLocalMetadataStore();
-
 coordinationService = new 
CoordinationServiceImpl(localMetadataStore);
 
+configurationMetadataStore = createConfigurationMetadataStore();
+pulsarResources = new PulsarResources(configurationMetadataStore);
+
 orderedExecutor = OrderedExecutor.newBuilder()
 .numThreads(config.getNumOrderedExecu

[pulsar] branch master updated (e58a906 -> ab9c77a)

2020-12-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from e58a906  Add azure offloader to website (#9018)
 add ab9c77a  [pulsar-broker] validate namespace isolation policy regex 
before updating (#8804)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 29 ++
 .../policies/data/NamespaceIsolationData.java  | 21 +++-
 2 files changed, 49 insertions(+), 1 deletion(-)



[pulsar] branch master updated: [pulsar-client] Handle NPE while receiving ack for closed producer (#8979)

2020-12-16 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d41036  [pulsar-client] Handle NPE while receiving ack for closed 
producer (#8979)
1d41036 is described below

commit 1d410365860c342a09c2aacad9b2f78e737a5dea
Author: Rajan Dhabalia 
AuthorDate: Wed Dec 16 14:35:18 2020 -0800

[pulsar-client] Handle NPE while receiving ack for closed producer (#8979)
---
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 212292e..3c66065 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -376,7 +376,15 @@ public class ClientCnx extends PulsarHandler {
 ledgerId, entryId);
 }
 
-producers.get(producerId).ackReceived(this, sequenceId, 
highestSequenceId, ledgerId, entryId);
+ProducerImpl producer = producers.get(producerId);
+if (producer != null) {
+producer.ackReceived(this, sequenceId, highestSequenceId, 
ledgerId, entryId);
+} else {
+if (log.isDebugEnabled()) {
+log.debug("Producer is {} already closed, ignore published 
message [{}-{}]", producerId, ledgerId,
+entryId);
+}
+}
 }
 
 @Override



[pulsar] branch master updated: [pulsar-broker] capture stats with precise backlog (#8928)

2020-12-14 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a7f692f  [pulsar-broker] capture stats with precise backlog (#8928)
a7f692f is described below

commit a7f692f981ddf2f86a69809867797f7269196884
Author: Rajan Dhabalia 
AuthorDate: Mon Dec 14 14:25:11 2020 -0800

[pulsar-broker] capture stats with precise backlog (#8928)

Co-authored-by: Sijie Guo 
---
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index f957532..07af718 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1537,7 +1537,7 @@ public class PersistentTopic extends AbstractTopic
 
 // Populate subscription specific stats here
 topicStatsStream.writePair("msgBacklog",
-subscription.getNumberOfEntriesInBacklog(false));
+subscription.getNumberOfEntriesInBacklog(true));
 topicStatsStream.writePair("msgRateExpired", 
subscription.getExpiredMessageRate());
 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
 topicStatsStream.writePair("msgThroughputOut", 
subMsgThroughputOut);



[pulsar] branch master updated (a145858 -> 7a8c6ba)

2020-11-06 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from a145858  [docs] Re-org topics in Admin API section (#8375)
 add 7a8c6ba  [pulsar-broker] Fix: bookie-isolation placement-policy was 
not configuring rackaware policy (#8461)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/BookKeeperClientFactoryImpl.java  |  4 
 .../apache/pulsar/broker/ManagedLedgerClientFactory.java   |  1 -
 .../pulsar/broker/service/BrokerBookieIsolationTest.java   | 14 --
 3 files changed, 16 insertions(+), 3 deletions(-)



[pulsar] branch master updated (505dcd5 -> aa16497)

2020-11-04 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 505dcd5  Clear the recently joined consumers when there are only one 
consumer under the Key_Shared subscription. (#8427)
 add aa16497  [pulsar-broker] get list of bundles under a namespace (#8450)

No new revisions were added by this update.

Summary of changes:
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  1 -
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 11 +++
 .../org/apache/pulsar/client/admin/Namespaces.java | 29 ++
 .../client/admin/internal/NamespacesImpl.java  | 35 ++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 13 
 5 files changed, 88 insertions(+), 1 deletion(-)



[pulsar] branch master updated (a9e2f7e -> 9d74007)

2020-10-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from a9e2f7e  [pulsar-broker] Keep max-concurrent http web-request 
configurable (#7250)
 add 9d74007  fix: intermittent test failure due to invalid cache (#8405)

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/pulsar/broker/admin/v2/Bookies.java | 1 +
 1 file changed, 1 insertion(+)



[pulsar] branch master updated (da61b56 -> a9e2f7e)

2020-10-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from da61b56  [pulsar-broker] add broker config to enforce producer to 
publish encrypted message (#8055)
 add a9e2f7e  [pulsar-broker] Keep max-concurrent http web-request 
configurable (#7250)

No new revisions were added by this update.

Summary of changes:
 conf/broker.conf | 3 +++
 conf/standalone.conf | 3 +++
 .../src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 3 +++
 .../src/main/java/org/apache/pulsar/broker/web/WebService.java   | 5 +++--
 4 files changed, 12 insertions(+), 2 deletions(-)



[pulsar] branch master updated (f96bc63 -> da61b56)

2020-10-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from f96bc63  Fix deadlock that occurred during topic ownership check 
(#8406)
 add da61b56  [pulsar-broker] add broker config to enforce producer to 
publish encrypted message (#8055)

No new revisions were added by this update.

Summary of changes:
 conf/broker.conf   |  3 ++
 conf/standalone.conf   |  3 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  5 
 .../apache/pulsar/broker/service/ServerCnx.java|  4 ++-
 .../pulsar/broker/service/ServerCnxTest.java   | 35 ++
 5 files changed, 49 insertions(+), 1 deletion(-)



[pulsar] branch master updated (06c9f57 -> d16b7f5)

2020-10-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 06c9f57  update helm deploy (#8404)
 add d16b7f5  [pulsar-cli] Fix properties name (#7249)

No new revisions were added by this update.

Summary of changes:
 .../src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[pulsar] branch master updated: [pulsar-broker] Fix: Tenant-admin should able to lookup topic (#8353)

2020-10-29 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4c7f83b  [pulsar-broker] Fix: Tenant-admin should able to lookup topic 
(#8353)
4c7f83b is described below

commit 4c7f83b93b454b4f74dbc418a519ceb6dc58296b
Author: Rajan Dhabalia 
AuthorDate: Thu Oct 29 00:30:56 2020 -0700

[pulsar-broker] Fix: Tenant-admin should able to lookup topic (#8353)

Co-authored-by: Sijie Guo 
---
 .../java/org/apache/pulsar/broker/lookup/TopicLookupBase.java  | 10 +-
 .../pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java  |  3 ++-
 2 files changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index cd140d5..19ddce75 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -65,7 +65,7 @@ public class TopicLookupBase extends PulsarWebResource {
 
 try {
 validateClusterOwnership(topicName.getCluster());
-checkConnect(topicName);
+validateAdminAndClientPermission(topicName);
 validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
 } catch (WebApplicationException we) {
 // Validation checks failed
@@ -127,6 +127,14 @@ public class TopicLookupBase extends PulsarWebResource {
 });
 }
 
+private void validateAdminAndClientPermission(TopicName topic) throws 
RestException, Exception {
+try {
+validateAdminAccessForTenant(topic.getTenant());
+} catch (Exception e) {
+checkConnect(topic);
+}
+}
+
 protected String internalGetNamespaceBundle(TopicName topicName) {
 validateSuperUserAccess();
 try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
index 26cb3c1..47a8921 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/BrokerAdminClientTlsAuthTest.java
@@ -135,7 +135,6 @@ public class BrokerAdminClientTlsAuthTest extends 
MockedPulsarServiceBaseTest {
 try (PulsarAdmin admin = buildAdminClient("admin")) {
 Policies policies = new Policies();
 policies.bundles = new BundlesData(4);
-policies.auth_policies.namespace_auth.put("admin", 
ImmutableSet.of(AuthAction.produce, AuthAction.consume));
 policies.replication_clusters = ImmutableSet.of("test");
 admin.namespaces().createNamespace("tenant/ns", policies);
 try {
@@ -144,6 +143,8 @@ public class BrokerAdminClientTlsAuthTest extends 
MockedPulsarServiceBaseTest {
 ex.printStackTrace();
 fail("Should not have thrown an exception");
 }
+String topicName = String.format("persistent://%s/t1", 
"tenant/ns");
+admin.lookups().lookupTopic(topicName);
 }
 
 }



[pulsar] branch master updated (647d3c2 -> 7f9b7cf)

2020-10-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from 647d3c2  PLSR-1240 upgrade GRPC to 1.31 to avoid deadlock (#8351)
 add 7f9b7cf  [pulsar-broker] configure namespace anti-affinity in local 
policies (#8349)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 75 +-
 .../broker/loadbalance/impl/LoadManagerShared.java | 18 +++---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 12 ++--
 .../pulsar/common/policies/data/LocalPolicies.java |  6 +-
 .../pulsar/common/policies/data/Policies.java  |  6 +-
 5 files changed, 67 insertions(+), 50 deletions(-)



[pulsar] branch master updated (a081bae -> 54bdf2e)

2020-10-22 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from a081bae  [website] Update the Event page (#8116)
 add 54bdf2e  [pulsar-broker-admin] Fix: split bundle overwrites 
local-policies (#8313)

No new revisions were added by this update.

Summary of changes:
 .../pulsar/broker/namespace/NamespaceService.java  |  4 ++--
 .../namespace/NamespaceCreateBundlesTest.java  | 25 ++
 2 files changed, 27 insertions(+), 2 deletions(-)



[pulsar] branch master updated: [pulsar-broker-admin] Support replication dispatch-rate limiting for v1-namespace api (#8314)

2020-10-21 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new faebd13  [pulsar-broker-admin] Support replication dispatch-rate 
limiting for v1-namespace api (#8314)
faebd13 is described below

commit faebd13639a8e41e36d48bcd0756de2c6ab40411
Author: Rajan Dhabalia 
AuthorDate: Wed Oct 21 10:32:03 2020 -0700

[pulsar-broker-admin] Support replication dispatch-rate limiting for 
v1-namespace api (#8314)
---
 .../apache/pulsar/broker/admin/v1/Namespaces.java  | 24 ++
 1 file changed, 24 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index 3fc076b..035c6f8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin.v1;
 import com.google.common.collect.Lists;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
@@ -637,6 +638,29 @@ public class Namespaces extends NamespacesBase {
 return internalGetSubscriptionDispatchRate();
 }
 
+@POST
+@Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate")
+@ApiOperation(value = "Set replicator dispatch-rate throttling for all 
topics of the namespace")
+@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission") })
+public void setReplicatorDispatchRate(@PathParam("tenant") String tenant,
+@PathParam("cluster") String cluster, @PathParam("namespace") 
String namespace,
+@ApiParam(value = "Replicator dispatch rate for all topics of the 
specified namespace") DispatchRate dispatchRate) {
+validateNamespaceName(tenant, cluster, namespace);
+internalSetReplicatorDispatchRate(dispatchRate);
+}
+
+@GET
+@Path("/{tenant}/{cluster}/{namespace}/replicatorDispatchRate")
+@ApiOperation(value = "Get replicator dispatch-rate configured for the 
namespace, -1 represents not configured yet")
+@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+@ApiResponse(code = 404, message = "Namespace does not exist") })
+public DispatchRate getReplicatorDispatchRate(@PathParam("tenant") String 
tenant,
+@PathParam("cluster") 
String cluster,
+@PathParam("namespace") 
String namespace) {
+validateNamespaceName(tenant, cluster, namespace);
+return internalGetReplicatorDispatchRate();
+}
+
 @GET
 @Path("/{property}/{cluster}/{namespace}/backlogQuotaMap")
 @ApiOperation(hidden = true, value = "Get backlog quota map on a 
namespace.")



[pulsar] branch master updated: Always use SNI for TLS enabled Pulsar Java client. (#8117)

2020-09-24 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new f2933f7  Always use SNI for TLS enabled Pulsar Java client. (#8117)
f2933f7 is described below

commit f2933f7da4850814d92fac0e54c5314c51c8fc32
Author: Rolf Arne Corneliussen 
AuthorDate: Thu Sep 24 22:37:51 2020 +0200

Always use SNI for TLS enabled Pulsar Java client. (#8117)

Co-authored-by: Rolf Arne Corneliussen 

---
 .../org/apache/pulsar/client/api/TlsSniTest.java   | 66 ++
 .../apache/pulsar/client/impl/ConnectionPool.java  | 56 +++---
 .../client/impl/PulsarChannelInitializer.java  | 58 +--
 .../util/keystoretls/KeyStoreSSLContext.java   | 10 +++-
 4 files changed, 124 insertions(+), 66 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
new file mode 100644
index 000..fc8c242
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsSniTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.testng.annotations.Test;
+
+import lombok.Cleanup;
+
+public class TlsSniTest extends TlsProducerConsumerBase {
+
+/**
+ * Verify that using an IP-address in the broker service URL will work 
with using the SNI capabilities
+ * of the client. If we try to create an {@link javax.net.ssl.SSLEngine} 
with a peer host that is an
+ * IP address, the peer host is ignored, see for example
+ * {@link io.netty.handler.ssl.ReferenceCountedOpenSslEngine}.
+ *
+ */
+@Test
+public void testIpAddressInBrokerServiceUrl() throws Exception {
+String topicName = "persistent://my-property/use/my-ns/my-topic1";
+
+URI brokerServiceUrlTls = new URI(pulsar.getBrokerServiceUrlTls());
+
+String brokerServiceIpAddressUrl = String.format("pulsar+ssl://%s:%d",
+
InetAddress.getByName(brokerServiceUrlTls.getHost()).getHostAddress(),
+brokerServiceUrlTls.getPort());
+
+ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(brokerServiceIpAddressUrl)
+
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).allowTlsInsecureConnection(false)
+.enableTlsHostnameVerification(false)
+.operationTimeout(1000, TimeUnit.MILLISECONDS);
+Map authParams = new HashMap<>();
+authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+clientBuilder.authentication(AuthenticationTls.class.getName(), 
authParams);
+
+@Cleanup
+PulsarClient pulsarClient = clientBuilder.build();
+// should be able to create producer successfully
+pulsarClient.newProducer().topic(topicName).create();
+}
+}
+
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 32d7195..6203990 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -90,7 +90,7 @@ public class ConnectionPool implements Closeable {
 bootstrap.option(ChannelOption.ALLOCATOR, 
PulsarByteBufAllocator.DEFAULT);
 
 try {
-channelInitializerHandler = new PulsarChannelInitializer(conf, 
clientCnxSupplier, isSniProxy);
+channelInitializerHandler = new PulsarChannelInitializer(conf, 
clientCnxSupplier);
 bootstrap.handler(channelInitializerHandler);
 } catch (Exception

[pulsar] branch master updated (fefaf52 -> a0877dd)

2020-09-23 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from fefaf52  Add ability to specify EnvironmentBasedSecretsProvider in 
LocalRunner (#8098)
 add a0877dd  [pulsar-broker] add configuration to set number of channels 
per bookie (#7910)

No new revisions were added by this update.

Summary of changes:
 conf/broker.conf| 3 +++
 conf/standalone.conf| 3 +++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java| 6 ++
 .../java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java  | 2 +-
 site2/docs/reference-configuration.md   | 1 +
 5 files changed, 14 insertions(+), 1 deletion(-)



[pulsar] branch master updated: [pulsar-client] sni-proxy protocol should pass sni-host address without resolving (#8062)

2020-09-21 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a7e30d8  [pulsar-client] sni-proxy protocol should pass sni-host 
address without resolving (#8062)
a7e30d8 is described below

commit a7e30d8150f626e75d8a17980e6e1019de659a5d
Author: Rajan Dhabalia 
AuthorDate: Mon Sep 21 15:39:09 2020 -0700

[pulsar-client] sni-proxy protocol should pass sni-host address without 
resolving (#8062)

make sni-proxy connection creation thread-safe

remove unused pair

initialize channel explicitly when sni-proxy is configured

initialize channel in io thread

fix channel var
---
 .../pulsar/client/api/ProxyProtocolTest.java   |   3 +-
 .../apache/pulsar/client/impl/ConnectionPool.java  | 105 +++--
 .../client/impl/PulsarChannelInitializer.java  |  41 
 3 files changed, 79 insertions(+), 70 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
index 964ebd9..d264b83 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProxyProtocolTest.java
@@ -40,7 +40,7 @@ public class ProxyProtocolTest extends 
TlsProducerConsumerBase {
 
 // Client should try to connect to proxy and pass broker-url as SNI 
header
 String proxyUrl = pulsar.getBrokerServiceUrlTls();
-String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
+String brokerServiceUrl = "pulsar+ssl://unresolvable-address:6651";
 String topicName = "persistent://my-property/use/my-ns/my-topic1";
 
 ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(brokerServiceUrl)
@@ -53,7 +53,6 @@ public class ProxyProtocolTest extends 
TlsProducerConsumerBase {
 
 @Cleanup
 PulsarClient pulsarClient = clientBuilder.build();
-
 // should be able to create producer successfully
 pulsarClient.newProducer().topic(topicName).create();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 316145e..32d7195 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.impl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
@@ -37,7 +36,6 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.net.UnknownHostException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -66,6 +64,7 @@ public class ConnectionPool implements Closeable {
 private final ClientConfigurationData clientConfig;
 private final EventLoopGroup eventLoopGroup;
 private final int maxConnectionsPerHosts;
+private final boolean isSniProxy;
 
 protected final DnsNameResolver dnsResolver;
 
@@ -78,6 +77,8 @@ public class ConnectionPool implements Closeable {
 this.eventLoopGroup = eventLoopGroup;
 this.clientConfig = conf;
 this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
+this.isSniProxy = clientConfig.isUseTls() && 
clientConfig.getProxyProtocol() != null
+&& StringUtils.isNotBlank(clientConfig.getProxyServiceUrl());
 
 pool = new ConcurrentHashMap<>();
 bootstrap = new Bootstrap();
@@ -89,7 +90,7 @@ public class ConnectionPool implements Closeable {
 bootstrap.option(ChannelOption.ALLOCATOR, 
PulsarByteBufAllocator.DEFAULT);
 
 try {
-channelInitializerHandler = new PulsarChannelInitializer(conf, 
clientCnxSupplier);
+channelInitializerHandler = new PulsarChannelInitializer(conf, 
clientCnxSupplier, isSniProxy);
 bootstrap.handler(channelInitializerHandler);
 } catch (Exception e) {
 log.error("Failed to create channel initializer");
@@ -224,18 +225,24 @@ public class ConnectionPool implements Closeable {
  * Resolve DNS asynchronously and attempt to connect to any IP address 
returned by DNS server
  */
 private CompletableFuture createConnection(InetSocketAddress 
unresolvedAddress) {
-String hostname = unresolvedAddress.getHostString();
-int port = unresolvedAddress.getPort();
+int port;
+CompletableFuture> resolvedAddress = nu

[pulsar] branch master updated: [pulsar-client] support input-stream for trustStore cert (#7442)

2020-09-19 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new d3c0ccb  [pulsar-client] support input-stream for trustStore cert 
(#7442)
d3c0ccb is described below

commit d3c0ccb17f282e8063173a50ee825efa280d4f92
Author: Rajan Dhabalia 
AuthorDate: Sat Sep 19 01:13:38 2020 -0700

[pulsar-client] support input-stream for trustStore cert (#7442)

* [pulsar-client] support input-stream for trustStore cert

remove file closing

fix check-style

* fix flaky test
---
 .../pulsar/client/api/TlsProducerConsumerTest.java | 23 +++---
 .../admin/internal/http/AsyncHttpConnector.java| 14 +
 .../client/api/AuthenticationDataProvider.java | 10 +++
 .../org/apache/pulsar/client/impl/HttpClient.java  | 13 
 .../client/impl/PulsarChannelInitializer.java  | 27 ++---
 .../client/impl/auth/AuthenticationDataTls.java| 15 --
 .../pulsar/client/impl/auth/AuthenticationTls.java | 13 ++--
 .../util/NettyClientSslContextRefresher.java   | 12 ++--
 .../apache/pulsar/common/util/SecurityUtility.java | 35 +-
 9 files changed, 122 insertions(+), 40 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
index 9f1eac8..614e75e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TlsProducerConsumerTest.java
@@ -142,16 +142,18 @@ public class TlsProducerConsumerTest extends 
TlsProducerConsumerBase {
 log.info("-- Starting {} test --", methodName);
 String topicName = "persistent://my-property/use/my-ns/my-topic1";
 ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
-
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+.enableTls(true).allowTlsInsecureConnection(false)
 .operationTimeout(1000, TimeUnit.MILLISECONDS);
 AtomicInteger index = new AtomicInteger(0);
 
 ByteArrayInputStream certStream = 
createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
 ByteArrayInputStream keyStream = 
createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+ByteArrayInputStream trustStoreStream = 
createByteInputStream(TLS_TRUST_CERT_FILE_PATH);
 
 Supplier certProvider = () -> getStream(index, 
certStream);
 Supplier keyProvider = () -> getStream(index, 
keyStream);
-AuthenticationTls auth = new AuthenticationTls(certProvider, 
keyProvider);
+Supplier trustStoreProvider = () -> 
getStream(index, trustStoreStream);
+AuthenticationTls auth = new AuthenticationTls(certProvider, 
keyProvider, trustStoreProvider);
 clientBuilder.authentication(auth);
 @Cleanup
 PulsarClient pulsarClient = clientBuilder.build();
@@ -196,16 +198,20 @@ public class TlsProducerConsumerTest extends 
TlsProducerConsumerBase {
 public void testTlsCertsFromDynamicStreamExpiredAndRenewCert() throws 
Exception {
 log.info("-- Starting {} test --", methodName);
 ClientBuilder clientBuilder = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrlTls())
-
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
+.enableTls(true).allowTlsInsecureConnection(false)
 .operationTimeout(1000, TimeUnit.MILLISECONDS);
 AtomicInteger certIndex = new AtomicInteger(1);
 AtomicInteger keyIndex = new AtomicInteger(0);
+AtomicInteger trustStoreIndex = new AtomicInteger(1);
 ByteArrayInputStream certStream = 
createByteInputStream(TLS_CLIENT_CERT_FILE_PATH);
 ByteArrayInputStream keyStream = 
createByteInputStream(TLS_CLIENT_KEY_FILE_PATH);
+ByteArrayInputStream trustStoreStream = 
createByteInputStream(TLS_TRUST_CERT_FILE_PATH);
 Supplier certProvider = () -> 
getStream(certIndex, certStream,
 keyStream/* invalid cert file */);
 Supplier keyProvider = () -> getStream(keyIndex, 
keyStream);
-AuthenticationTls auth = new AuthenticationTls(certProvider, 
keyProvider);
+Supplier trustStoreProvider = () -> 
getStream(trustStoreIndex, trustStoreStream,
+keyStream/* invalid cert file */);
+AuthenticationTls auth = new AuthenticationTls(certProvider, 
keyProvider, trustStoreProvider);
 clientBuilder.authentication(auth);
 @Cleanup
 PulsarClient pulsarClient = clientBu

[pulsar] branch master updated: [pulsar-test] shutdown pulsar-client to cleanup forcefully (#7501)

2020-07-10 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 9f9db24e [pulsar-test] shutdown pulsar-client to cleanup forcefully 
(#7501)
9f9db24e is described below

commit 9f9db24e37398a0b66c8af324b34674afeb63066
Author: Rajan Dhabalia 
AuthorDate: Fri Jul 10 10:29:46 2020 -0700

[pulsar-test] shutdown pulsar-client to cleanup forcefully (#7501)
---
 .../java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index f808b96..394cd9c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -192,7 +192,7 @@ public abstract class MockedPulsarServiceBaseTest {
 admin = null;
 }
 if (pulsarClient != null) {
-pulsarClient.close();
+pulsarClient.shutdown();
 pulsarClient = null;
 }
 if (pulsar != null) {



[pulsar] branch master updated: PIP 37: [pulsar-client] support large message size (#4400)

2020-06-02 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 870a637  PIP 37: [pulsar-client] support large message size (#4400)
870a637 is described below

commit 870a637b4906862a611e418341dd926e21458f08
Author: Rajan Dhabalia 
AuthorDate: Tue Jun 2 00:39:55 2020 -0700

PIP 37: [pulsar-client] support large message size (#4400)

* PIP 37: [pulsar-client] support large message size

fix producer

fix ref counts

add timeouts

add validation

fix recycling

fix stats

review

fix test

fix test

fix send message and expiry-consumer-config

fix schema test

fix chunk properties

* fix test
---
 .../broker/admin/impl/PersistentTopicsBase.java|   4 +
 .../broker/service/AbstractBaseDispatcher.java |   3 +
 .../org/apache/pulsar/broker/service/Consumer.java |   7 +-
 .../org/apache/pulsar/broker/service/Producer.java |  43 ++-
 .../pulsar/broker/service/SendMessageInfo.java |   2 +
 .../apache/pulsar/broker/service/ServerCnx.java|   5 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |   4 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   4 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   4 +-
 .../PersistentDispatcherMultipleConsumers.java |   3 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   5 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   7 +-
 .../service/persistent/PersistentSubscription.java |   1 +
 .../broker/service/persistent/PersistentTopic.java |  12 +
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |   2 +
 .../pulsar/client/impl/MessageChunkingTest.java| 379 +
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  45 +++
 .../apache/pulsar/client/api/ProducerBuilder.java  |  27 ++
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  21 +-
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  13 +-
 .../apache/pulsar/client/impl/ConsumerBase.java|   3 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java|  18 +
 .../apache/pulsar/client/impl/ConsumerImpl.java| 254 --
 .../client/impl/MultiTopicsConsumerImpl.java   |   5 +-
 .../pulsar/client/impl/NegativeAcksTracker.java|   2 +
 .../PersistentAcknowledgmentsGroupingTracker.java  |  73 +++-
 .../pulsar/client/impl/ProducerBuilderImpl.java|   9 +
 .../apache/pulsar/client/impl/ProducerImpl.java| 237 -
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  14 +
 .../impl/conf/ConsumerConfigurationData.java   |   7 +
 .../impl/conf/ProducerConfigurationData.java   |   1 +
 .../impl/AcknowledgementsGroupingTrackerTest.java  |   2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 322 +
 .../pulsar/common/policies/data/ConsumerStats.java |   3 +
 .../policies/data/NonPersistentPublisherStats.java |   2 +-
 .../common/policies/data/PublisherStats.java   |   3 +
 .../common/policies/data/SubscriptionStats.java|   3 +
 .../pulsar/common/policies/data/TopicStats.java|   3 +
 .../apache/pulsar/common/protocol/Commands.java|   6 +
 pulsar-common/src/main/proto/PulsarApi.proto   |   5 +
 .../pulsar/testclient/PerformanceConsumer.java |  21 +-
 .../pulsar/testclient/PerformanceProducer.java |  13 +-
 42 files changed, 1450 insertions(+), 147 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index a8ab7fd..99f49a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1931,6 +1931,10 @@ public class PersistentTopicsBase extends AdminResource {
 if (metadata.hasNullValue()) {
 responseBuilder.header("X-Pulsar-null-value", 
metadata.hasNullValue());
 }
+if (metadata.getNumChunksFromMsg() > 0) {
+responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", 
Integer.toString(metadata.getNumChunksFromMsg()));
+responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", 
Integer.toString(metadata.getChunkId()));
+}
 
 // Decode if needed
 CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 849d626..7985f30 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+

[pulsar.wiki] branch master updated: ats example

2020-03-31 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new 25335e9  ats example
25335e9 is described below

commit 25335e9948eb67346c3e6f1bb66770d8a6f36325
Author: Rajan Dhabalia 
AuthorDate: Tue Mar 31 19:56:50 2020 -0700

ats example
---
 PIP-60:-Support-Proxy-server-with-SNI-routing.md | 75 +++-
 1 file changed, 74 insertions(+), 1 deletion(-)

diff --git a/PIP-60:-Support-Proxy-server-with-SNI-routing.md 
b/PIP-60:-Support-Proxy-server-with-SNI-routing.md
index d266f10..d1f7f94 100644
--- a/PIP-60:-Support-Proxy-server-with-SNI-routing.md
+++ b/PIP-60:-Support-Proxy-server-with-SNI-routing.md
@@ -59,4 +59,77 @@ We can also use proxy-server in geo-replication to create a 
proxy between two br
 --broker-url-secure pulsar+ssl://my-dmz-broker.com:6651 \
 --proxy-url pulsar+ssl://my-dmz-proxy.com:4443 \
 --proxy-protocol SNI
-```
\ No newline at end of file
+```
+
+## Example
+This section shows SNI-routing using ATS-proxy server. This section shows how 
to configure ATS proxy-server and pulsar-client to create TCP tunnel between 
client and broker.
+
+![image](https://user-images.githubusercontent.com/2898254/78093926-21eabd80-7388-11ea-8982-4a4d644a2b39.png)
+[Figure 3: Pulsar SNI 
Routing with ATS proxy server]
+
+In this example, Pulsar broker cluster is behind the ATS proxy. Pulsar brokers 
can’t be accessed by any host except ATS proxy. So, if client wants to connect 
to pulsar-cluster then client can use ATS-proxy server to create a TCP tunnel 
with brokers. Pulsar client can use SNI-routing proxy protocol to connect to 
ATS-proxy and asks ATS-proxy to create TCP tunnel with a target broker.
+
+This example shows, how can we configure ATS-proxy so, when client passes 
target broker name into SNI header then ATS-proxy server can find out 
appropriate broker-url based on configured sni-mapping and forward request to 
appropriate target broker by creating tcp tunnel with that broker.
+
+### ATS Configuration
+In order to enable SNI routing into ATS proxy, we need to manage 2 
configuration files:
+1. 
[ssl_server_name.config](https://docs.trafficserver.apache.org/en/8.0.x/admin-guide/files/ssl_server_name.yaml.en.html)
+
+This file is used to configure aspects of TLS connection handling for both 
inbound and outbound connections. The configuration is driven by the SNI values 
provided by the inbound connection. So, this file manages mapping between 
hostname which comes into SNI header and actual broker-url where request needs 
to be forwarded for that host.   
+ssl_server_name.config
+
+```
+server_config = {
+  {
+ fqdn = 'pulsar-broker-vip',
+ # Forward to Pulsar broker which is listening on 6651
+ tunnel_route = 'pulsar-broker-vip:6651'
+  },
+  {
+ fqdn = 'pulsar-broker1',
+ # Forward to Pulsar broker-1 which is listening on 6651
+ tunnel_route = 'pulsar-broker1:6651'
+  },
+  {
+ fqdn = 'pulsar-broker2',
+ # Forward to Pulsar broker-2 which is listening on 6651
+ tunnel_route = 'pulsar-broker2:6651'
+  },
+}
+```
+
+2.[records.config](https://docs.trafficserver.apache.org/en/8.0.x/admin-guide/files/records.config.en.html)
+
+The records.config file, located in /usr/local/etc/trafficserver/) is a list 
of configurable variables used by the Traffic Server software. One of the 
requirements of SNI routing in ATS is that it only works over TLS. Therefore, 
Pulsar brokers and ATS proxy-server should have tls enabled. We will define tls 
configuration for ATS-proxy server into records.config file.
+
+```
+CONFIG proxy.config.http.connect_ports STRING 4443 6651
+# ats-proxy cert file
+CONFIG proxy.config.ssl.client.cert.path STRING /ats-cert.pem
+# ats-proxy key file
+CONFIG proxy.config.ssl.client.cert.filename STRING /ats-key.pem
+# ssl-port on which ats will listen
+CONFIG proxy.config.http.server_ports STRING 4443:ssl 4080
+```
+Once, `ssl_server_name.config` and `records.config` are configured, ATS-proxy 
server is ready to handle SNI routing and can create TCP tunnel beween client 
and broker.
+
+### Pulsar-client Configuration
+Now, ATS proxy server is configured and ready to handle SNI routing and create 
TCP tunnel between client and broker. With this PIP, pulsar-client supports SNI 
routing by connecting to proxy and sending target broker url into SNI header. 
Pulsar-client handles SNI routing internally and entire connection handling is 
abstracted from user. User have to only configure following proxy configuration 
intially when user creates a pulsar-client to use SNI routing protocol.
+```
+String brokerServiceUrl = “pulsar+ssl://pulsar-broker-vip:6651/”;
+String proxyUrl = “pulsar+ssl://ats-proxy:443”;
+ClientBuilder clientBuilder = PulsarClient.builder

[pulsar.wiki] branch master updated: add pulsar-admin cli example

2020-03-25 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new 7cc4cb8  add pulsar-admin cli example
7cc4cb8 is described below

commit 7cc4cb81c35d4b9e2a7aa7ad0b4a5233bca911a5
Author: Rajan Dhabalia 
AuthorDate: Wed Mar 25 12:32:44 2020 -0700

add pulsar-admin cli example
---
 PIP-60:-Support-Proxy-server-with-SNI-routing.md | 10 +-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/PIP-60:-Support-Proxy-server-with-SNI-routing.md 
b/PIP-60:-Support-Proxy-server-with-SNI-routing.md
index fc27f0a..d266f10 100644
--- a/PIP-60:-Support-Proxy-server-with-SNI-routing.md
+++ b/PIP-60:-Support-Proxy-server-with-SNI-routing.md
@@ -2,6 +2,7 @@
 
  * **Status**: Discussion
  * **Author**: [Rajan Dhabalia](https://github.com/rdhabalia)
+ * **Pull Request**: [#6566](https://github.com/apache/pulsar/pull/6566)
 
 ## Motivation
 
@@ -51,4 +52,11 @@ ProxyProtocol proxyProtocol; //eg: enum: ProxyProtocol.SNI
 ```
 
  Geo-replication
-We can also use proxy-server in geo-replication to create a proxy between two 
brokers. Therefore, we need similar configurations as pulsar-client into 
cluster metadata as well. If cluster metadata has a proxy configured then other 
clusters will connect to this pulsar cluster via proxy. 
\ No newline at end of file
+We can also use proxy-server in geo-replication to create a proxy between two 
brokers. Therefore, we need similar configurations as pulsar-client into 
cluster metadata as well. If cluster metadata has a proxy configured then other 
clusters will connect to this pulsar cluster via proxy. We can configure 
cluster-metadata with proxy-uri and protocol  using admin api/cli.
+```
+./pulsar-admin clusters create my-DMZ-cluster \
+--url http://my-dmz-broker.com:8080 \
+--broker-url-secure pulsar+ssl://my-dmz-broker.com:6651 \
+--proxy-url pulsar+ssl://my-dmz-proxy.com:4443 \
+--proxy-protocol SNI
+```
\ No newline at end of file



[pulsar.wiki] branch master updated: add PIP-60

2020-03-19 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new 38b6539  add PIP-60
38b6539 is described below

commit 38b6539edf2d432dde5017b5d13e9287e863a9ce
Author: Rajan Dhabalia 
AuthorDate: Thu Mar 19 16:48:31 2020 -0700

add PIP-60
---
 Home.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/Home.md b/Home.md
index b34b672..672e0ab 100644
--- a/Home.md
+++ b/Home.md
@@ -7,6 +7,7 @@ We encourage to document any big change or feature or any 
addition to public use
 
 
 ### Proposed
+* [[PIP 60: Support Proxy server with SNI routing]]
 * [[PIP 59: gPRC Protocol Handler]]
 * [[PIP 58 : Support Consumers  Set Custom Retry Delay]]
 * [[PIP 57: Improve Broker's Zookeeper Session Timeout Handling]]



[pulsar.wiki] branch master updated: initial draft

2020-03-19 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
 new 721dd0c  initial draft
721dd0c is described below

commit 721dd0c76327f9a6d28829c56b08c9f2cd6d5086
Author: Rajan Dhabalia 
AuthorDate: Thu Mar 19 16:47:35 2020 -0700

initial draft
---
 PIP-60:-Support-Proxy-server-with-SNI-routing.md | 54 
 1 file changed, 54 insertions(+)

diff --git a/PIP-60:-Support-Proxy-server-with-SNI-routing.md 
b/PIP-60:-Support-Proxy-server-with-SNI-routing.md
new file mode 100644
index 000..fc27f0a
--- /dev/null
+++ b/PIP-60:-Support-Proxy-server-with-SNI-routing.md
@@ -0,0 +1,54 @@
+# PIP-60 Support Proxy-server with SNI routing
+
+ * **Status**: Discussion
+ * **Author**: [Rajan Dhabalia](https://github.com/rdhabalia)
+
+## Motivation
+
+A proxy server is a go‑between or intermediary server that forwards requests 
from multiple clients to different servers across the Internet.  The proxy 
server can act as a “traffic cop,” in both forward and reverse proxy scenarios, 
and adds various benefits in your system such as load balancing, performance, 
security, auto-scaling, etc.. There are already many proxy servers already 
available in the market which are fast, scalable and more importantly covers 
various essential security asp [...]
+
+## SNI Routing
+
+[TLS Extension Definition](https://tools.ietf.org/html/rfc6066) adds support 
of Server Name Indication [SNI](https://tools.ietf.org/html/rfc6066#page-6) and 
it has been available since January 2011. In SNI routing, the proxy server 
examines TLS connection on an initial handshake and extracts the "SNI" value. 
+Proxy-server creates a connection with a destination host defined into SNI and 
forms a TLS tunnel between client and destination host.
+Figure 1: shows the layer-4 routing network activity diagram between client 
and application server via proxy-server (eg: ATS). In the figure, the client 
initiates TLS connection with the proxy-server by passing hostname of 
application-server in the SNI header. The proxy server examines SNI header, 
parses hostname of the target application server and creates an outbound 
connection with that application server host. Once, proxy server successfully 
completes TLS handshake with the applicati [...]
+
+
+![image](https://user-images.githubusercontent.com/2898254/76898266-9b47c380-6852-11ea-8969-55783cda5ed0.png)
+[Figure 1: Layer 4 
routing]
+
+## Pulsar and Proxy with SNI routing
+
+Pulsar client creates TCP connection with the broker and exchanges binary data 
over TCP channel. Pulsar also supports TLS to create a secure connection 
between a client and the broker. As we have discussed in figure-1, the proxy 
server performs SNI routing on TLS connection and this mechanism can be 
perfectly fit into pulsar to introduce proxy between client and broker, and use 
proxy-server for both forward-proxy and reverse-proxy use cases. 
+In order to access a specific topic, the pulsar-client first finds out the 
target broker which owns the topic and then creates a connection channel with 
that target broker to publish/consume messages on that topic. This section 
explains how pulsar-client uses proxy-server to find a target broker for a 
specific topic and create a TCP channel with that target broker to access the 
topic.
+Pulsar client connects to the broker in two phases: lookup and connect.  
+In lookup phase, a client does a lookup with the broker-service URL / VIP to 
connect with any broker in the cluster, send lookup request for a topic and 
receive the response with the name of the target broker that owns the topic. In 
the second phase, the client uses the lookup response which has the name of the 
target broker and connects with that target broker to publish/consume messages. 
+
+Figure 2 shows SNI routing for both the phases and how proxy-server creates a 
TCP tunnel between client and broker. In SNI Routing, pulsar-client always 
tries to create a connection with proxy-server and pass the target 
broker/service URL as part of the SNI header. 
+ 
+
+![image](https://user-images.githubusercontent.com/2898254/76898221-879c5d00-6852-11ea-97ed-155e78f7dead.png)
+[Figure 2: 
Pulsar-client and broker connection using SNI Routing]
+
+**Lookup phase**
+
+Figure 2 shows that pulsar-client uses broker-service url (pulsar-vip) and 
proxy-server url (proxy-server:4443) to perform lookup using SNI routing. 
Pulsar-client uses proxy-server url to create a TLS connection with 
proxy-server and uses broker-service url to pass it in SNI header. At the time 
of TLS handshake, Proxy-server does SNI-routing by extracting broker-service 
name value from SNI header, finds appropriate mapped broke

[pulsar] branch master updated (efe19e0 -> c802609)

2020-03-11 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


from efe19e0  [Maven Cleanup] Remove managed-ledger and zk-utils test-jar 
dependencies when possible (#6513)
 add c802609  [pulsar-proxy] add rest-api to get connection and topic stats 
(#6473)

No new revisions were added by this update.

Summary of changes:
 .../impl/ManagedLedgerFactoryMBeanImpl.java|   2 +-
 .../mledger/impl/ManagedLedgerMBeanImpl.java   |   2 +-
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   2 +-
 .../nonpersistent/NonPersistentDispatcher.java |   2 +-
 .../NonPersistentDispatcherMultipleConsumers.java  |   2 +-
 ...onPersistentDispatcherSingleActiveConsumer.java |   2 +-
 .../nonpersistent/NonPersistentReplicator.java |   2 +-
 .../persistent/PersistentMessageExpiryMonitor.java |   2 +-
 .../service/persistent/PersistentReplicator.java   |   2 +-
 .../java/org/apache/pulsar/common/stats}/Rate.java |   2 +-
 .../pulsar/proxy/server/DirectProxyHandler.java|  17 +-
 .../pulsar/proxy/server/ParserProxyHandler.java|  48 --
 .../pulsar/proxy/server/ProxyConnection.java   |   9 +-
 .../apache/pulsar/proxy/server/ProxyService.java   |  35 
 .../pulsar/proxy/server/ProxyServiceStarter.java   |   8 +-
 .../apache/pulsar/proxy/stats/ConnectionStats.java |  32 ++--
 .../org/apache/pulsar/proxy/stats/ProxyStats.java  |  97 +++
 .../apache/pulsar/proxy/stats}/RestException.java  |  25 +--
 .../org/apache/pulsar/proxy/stats/TopicStats.java  |  50 --
 .../pulsar/proxy/server/ProxyIsAHttpProxyTest.java |  16 +-
 .../apache/pulsar/proxy/server/ProxyStatsTest.java | 191 +
 22 files changed, 451 insertions(+), 99 deletions(-)
 rename {managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util => 
pulsar-common/src/main/java/org/apache/pulsar/common/stats}/Rate.java (98%)
 copy 
pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
 => 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ConnectionStats.java 
(53%)
 create mode 100644 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/ProxyStats.java
 copy {pulsar-common/src/main/java/org/apache/pulsar/common/util => 
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats}/RestException.java 
(67%)
 copy 
pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionState.java
 => pulsar-proxy/src/main/java/org/apache/pulsar/proxy/stats/TopicStats.java 
(54%)
 create mode 100644 
pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyStatsTest.java



[pulsar] branch master updated: [pulsar-broker] add support to configure max pending publish request per connection (#5742)

2020-03-10 Thread rdhabalia
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new a9d56ea  [pulsar-broker] add support to configure max pending publish 
request per connection (#5742)
a9d56ea is described below

commit a9d56ea20ba938b31233edb87345e4a9f62b6041
Author: Rajan Dhabalia 
AuthorDate: Tue Mar 10 10:55:18 2020 -0700

[pulsar-broker] add support to configure max pending publish request per 
connection (#5742)
---
 conf/broker.conf   |  4 
 conf/standalone.conf   |  4 
 .../java/org/apache/pulsar/broker/ServiceConfiguration.java|  6 ++
 .../main/java/org/apache/pulsar/broker/service/ServerCnx.java  | 10 ++
 4 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 5989880..08f370c 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -114,6 +114,10 @@ brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions
 # Topics that are inactive for longer than this value will be deleted
 brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=
 
+# Max pending publish requests per connection to avoid keeping large number of 
pending 
+# requests in memory. Default: 1000
+maxPendingPublishdRequestsPerConnection=1000;
+
 # How frequently to proactively check and purge expired messages
 messageExpiryCheckIntervalInMinutes=5
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index f351d8d..81bbd43 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -75,6 +75,10 @@ brokerDeleteInactiveTopicsEnabled=true
 # How often to check for inactive topics
 brokerDeleteInactiveTopicsFrequencySeconds=60
 
+# Max pending publish requests per connection to avoid keeping large number of 
pending 
+# requests in memory. Default: 1000
+maxPendingPublishdRequestsPerConnection=1000;
+
 # How frequently to proactively check and purge expired messages
 messageExpiryCheckIntervalInMinutes=5
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 0d554f0..9d2d79d 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -276,6 +276,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
 @FieldContext(
 category = CATEGORY_POLICIES,
+doc = "Max pending publish requests per connection to avoid keeping 
large number of pending "
++ "requests in memory. Default: 1000"
+)
+private int maxPendingPublishdRequestsPerConnection = 1000;
+@FieldContext(
+category = CATEGORY_POLICIES,
 doc = "How frequently to proactively check and purge expired messages"
 )
 private int messageExpiryCheckIntervalInMinutes = 5;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1037952..b91bce8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -142,8 +142,8 @@ public class ServerCnx extends PulsarHandler {
 
 // Max number of pending requests per connections. If multiple producers 
are sharing the same connection the flow
 // control done by a single producer might not be enough to prevent write 
spikes on the broker.
-private static final int MaxPendingSendRequests = 1000;
-private static final int ResumeReadsThreshold = MaxPendingSendRequests / 2;
+private final int maxPendingSendRequests;
+private final int resumeReadsThreshold;
 private int pendingSendRequest = 0;
 private final String replicatorPrefix;
 private String clientVersion = null;
@@ -183,6 +183,8 @@ public class ServerCnx extends PulsarHandler {
 this.authenticateOriginalAuthData = 
service.pulsar().getConfiguration().isAuthenticateOriginalAuthData();
 this.schemaValidationEnforced = 
pulsar.getConfiguration().isSchemaValidationEnforced();
 this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
+this.maxPendingSendRequests = 
pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
+this.resumeReadsThreshold = maxPendingSendRequests / 2;
 }
 
 @Override
@@ -1759,7 +1761,7 @@ public class ServerCnx extends PulsarHandler {
 private void startSendOperation(Producer producer, int msgSize) {
 messagePublishBufferSize += msgSize;
 boolean isPublishRateExceeded = 
producer.ge

  1   2   >