(pulsar) branch branch-3.0 updated: [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856)

2024-06-27 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new ab8dba3370f [improve] [broker] PIP-356 Support Geo-Replication starts 
at earliest position (#22856)
ab8dba3370f is described below

commit ab8dba3370f0ce02bcef2bc6ae5295ffa874a7df
Author: fengyubiao 
AuthorDate: Wed Jun 19 22:29:17 2024 +0800

[improve] [broker] PIP-356 Support Geo-Replication starts at earliest 
position (#22856)

(cherry picked from commit 5fc0eafab9ea2a4ece7b87218404489c270b64e6)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 ++
 .../broker/service/persistent/PersistentTopic.java |   9 +-
 .../broker/service/OneWayReplicatorTest.java   | 103 -
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  52 +++
 4 files changed, 167 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index ff33d9103ab..0f33be168cf 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1329,6 +1329,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 doc = "Max number of snapshot to be cached per subscription.")
 private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
 
+@FieldContext(
+category = CATEGORY_SERVER,
+dynamic = true,
+doc = "The position that replication task start at, it can be set 
to earliest or latest (default).")
+private String replicationStartAt = "latest";
+
 @FieldContext(
 category = CATEGORY_SERVER,
 dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index df16883b5c8..3eb7648614f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1905,7 +1905,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 final CompletableFuture future = new CompletableFuture<>();
 
 String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
-ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+final InitialPosition initialPosition;
+if (MessageId.earliest.toString()
+
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt()))
 {
+initialPosition = InitialPosition.Earliest;
+} else {
+initialPosition = InitialPosition.Latest;
+}
+ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() 
{
 @Override
 public void openCursorComplete(ManagedCursor cursor, Object ctx) {
 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index 682ec55ae40..f0f699a7e16 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -34,6 +35,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -68,11 +70,12 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.naming.TopicName;
 import 

(pulsar) branch branch-3.3 updated: [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856)

2024-06-27 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 4399b2743cd [improve] [broker] PIP-356 Support Geo-Replication starts 
at earliest position (#22856)
4399b2743cd is described below

commit 4399b2743cdc01070a871b32f7bc02ac736e9c80
Author: fengyubiao 
AuthorDate: Wed Jun 19 22:29:17 2024 +0800

[improve] [broker] PIP-356 Support Geo-Replication starts at earliest 
position (#22856)

(cherry picked from commit 5fc0eafab9ea2a4ece7b87218404489c270b64e6)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   6 ++
 .../broker/service/persistent/PersistentTopic.java |   9 +-
 .../broker/service/OneWayReplicatorTest.java   | 103 -
 .../service/OneWayReplicatorUsingGlobalZKTest.java |  52 +++
 4 files changed, 167 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efe1856509..11d1d663f42 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1345,6 +1345,12 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 doc = "Max number of snapshot to be cached per subscription.")
 private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;
 
+@FieldContext(
+category = CATEGORY_SERVER,
+dynamic = true,
+doc = "The position that replication task start at, it can be set 
to earliest or latest (default).")
+private String replicationStartAt = "latest";
+
 @FieldContext(
 category = CATEGORY_SERVER,
 dynamic = true,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 3331dcb53b3..dbbb0b07ce3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2062,7 +2062,14 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
 final CompletableFuture future = new CompletableFuture<>();
 
 String name = PersistentReplicator.getReplicatorName(replicatorPrefix, 
remoteCluster);
-ledger.asyncOpenCursor(name, new OpenCursorCallback() {
+final InitialPosition initialPosition;
+if (MessageId.earliest.toString()
+
.equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt()))
 {
+initialPosition = InitialPosition.Earliest;
+} else {
+initialPosition = InitialPosition.Latest;
+}
+ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() 
{
 @Override
 public void openCursorComplete(ManagedCursor cursor, Object ctx) {
 String localCluster = 
brokerService.pulsar().getConfiguration().getClusterName();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index b751d269d1f..5eb1385d85b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -36,6 +37,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.UUID;
@@ -71,11 +73,12 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.naming.TopicName;
 import 

(pulsar) 03/03: [fix] [broker] response not-found error if topic does not exist when calling getPartitionedTopicMetadata (#22838)

2024-06-27 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 72705984a9cd5bad9ceb083258c37a7ec83c6987
Author: fengyubiao 
AuthorDate: Mon Jun 17 23:39:08 2024 +0800

[fix] [broker] response not-found error if topic does not exist when 
calling getPartitionedTopicMetadata (#22838)

(cherry picked from commit 9aed73653e1f706e3517072cce4a352d0838f8d7)
---
 .../broker/admin/impl/PersistentTopicsBase.java|  21 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  16 +-
 .../pulsar/broker/lookup/TopicLookupBase.java  |  22 +-
 .../pulsar/broker/namespace/NamespaceService.java  | 101 +++-
 .../pulsar/broker/namespace/TopicExistsInfo.java   |  82 +++
 .../pulsar/broker/service/BrokerService.java   | 117 ++--
 .../apache/pulsar/broker/service/ServerCnx.java|  81 +--
 .../admin/GetPartitionMetadataMultiBrokerTest.java | 222 
 .../broker/admin/GetPartitionMetadataTest.java | 608 +++--
 .../org/apache/pulsar/broker/admin/TopicsTest.java |  13 +-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |  19 +-
 .../broker/namespace/NamespaceServiceTest.java |   7 +-
 .../apache/pulsar/broker/service/TopicGCTest.java  |   2 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java|  37 +-
 14 files changed, 899 insertions(+), 449 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f9471b03c5f..887591c0feb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -559,13 +559,13 @@ public class PersistentTopicsBase extends AdminResource {
 // is a non-partitioned topic so we shouldn't check if 
the topic exists.
 return 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
 .thenCompose(brokerAllowAutoTopicCreation -> {
-if (checkAllowAutoCreation) {
+if (checkAllowAutoCreation && 
brokerAllowAutoTopicCreation) {
 // Whether it exists or not, auto create a 
non-partitioned topic by client.
 return 
CompletableFuture.completedFuture(metadata);
 } else {
 // If it does not exist, response a Not Found 
error.
 // Otherwise, response a non-partitioned 
metadata.
-return 
internalCheckTopicExists(topicName).thenApply(__ -> metadata);
+return 
internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata);
 }
 });
 }
@@ -713,6 +713,17 @@ public class PersistentTopicsBase extends AdminResource {
 
 protected CompletableFuture internalCheckTopicExists(TopicName 
topicName) {
 return pulsar().getNamespaceService().checkTopicExists(topicName)
+.thenAccept(info -> {
+boolean exists = info.isExists();
+info.recycle();
+if (!exists) {
+throw new RestException(Status.NOT_FOUND, 
getTopicNotFoundErrorMessage(topicName.toString()));
+}
+});
+}
+
+protected CompletableFuture 
internalCheckNonPartitionedTopicExists(TopicName topicName) {
+return 
pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
 .thenAccept(exist -> {
 if (!exist) {
 throw new RestException(Status.NOT_FOUND, 
getTopicNotFoundErrorMessage(topicName.toString()));
@@ -5336,8 +5347,10 @@ public class PersistentTopicsBase extends AdminResource {
 "Only persistent topic can be set as shadow 
topic"));
 }
 
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
-.thenAccept(isExists -> {
-if (!isExists) {
+.thenAccept(info -> {
+boolean exists = info.isExists();
+info.recycle();
+if (!exists) {
 throw new 
RestException(Status.PRECONDITION_FAILED,
 "Shadow topic [" + shadowTopic + "] 
not exists.");
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
 

(pulsar) branch branch-3.3 updated (7d81ee9d7de -> 72705984a9c)

2024-06-27 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


from 7d81ee9d7de [fix][broker] Ensure that PulsarService is ready for 
serving incoming requests (#22977)
 new a927ef43935 [fix] [client] PIP-344 Do not create partitioned metadata 
when calling pulsarClient.getPartitionsForTopic(topicName) (#22206)
 new b5fdeff5444 [improve] [client] PIP-344 support feature flag 
supportsGetPartitionedMetadataWithoutAutoCreation (#22773)
 new 72705984a9c [fix] [broker] response not-found error if topic does not 
exist when calling getPartitionedTopicMetadata (#22838)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../broker/admin/impl/PersistentTopicsBase.java|  43 +-
 .../broker/admin/v2/NonPersistentTopics.java   |  16 +-
 .../pulsar/broker/lookup/TopicLookupBase.java  |  22 +-
 .../pulsar/broker/namespace/NamespaceService.java  | 101 +++-
 .../pulsar/broker/namespace/TopicExistsInfo.java   |  82 
 .../pulsar/broker/service/BrokerService.java   | 117 ++---
 .../apache/pulsar/broker/service/ServerCnx.java|  83 +++-
 .../admin/GetPartitionMetadataMultiBrokerTest.java | 222 +
 .../broker/admin/GetPartitionMetadataTest.java | 523 +
 .../pulsar/broker/admin/TopicAutoCreationTest.java |   3 +-
 .../org/apache/pulsar/broker/admin/TopicsTest.java |  13 +-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |  19 +-
 .../broker/namespace/NamespaceServiceTest.java |   7 +-
 .../BrokerServiceAutoTopicCreationTest.java|   4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |  10 +-
 .../service/BrokerServiceThrottlingTest.java   |   2 +-
 .../pulsar/broker/service/ServerCnxTest.java   |   2 +-
 .../apache/pulsar/broker/service/TopicGCTest.java  |   2 +
 .../buffer/TransactionLowWaterMarkTest.java|   4 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |   2 +-
 .../apache/pulsar/client/impl/ClientCnxTest.java   |  44 ++
 .../org/apache/pulsar/client/api/PulsarClient.java |  23 +-
 .../client/impl/BinaryProtoLookupService.java  |  19 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   5 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java|  37 +-
 .../pulsar/client/impl/HttpLookupService.java  |   6 +-
 .../apache/pulsar/client/impl/LookupService.java   |  27 +-
 .../client/impl/MultiTopicsConsumerImpl.java   |   2 +-
 .../pulsar/client/impl/PulsarClientImpl.java   |  32 +-
 .../TransactionCoordinatorClientImpl.java  |   3 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  12 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   |   3 +-
 .../apache/pulsar/common/protocol/Commands.java|   8 +-
 .../org/apache/pulsar/common/util/FutureUtil.java  |   4 +-
 pulsar-common/src/main/proto/PulsarApi.proto   |   2 +
 .../pulsar/proxy/server/LookupProxyHandler.java|   3 +-
 36 files changed, 1307 insertions(+), 200 deletions(-)
 create mode 100644 
pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java
 create mode 100644 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java
 create mode 100644 
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java



(pulsar) 01/03: [fix] [client] PIP-344 Do not create partitioned metadata when calling pulsarClient.getPartitionsForTopic(topicName) (#22206)

2024-06-27 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a927ef43935798d573c23910af58f5f1dd1e916a
Author: fengyubiao 
AuthorDate: Thu May 23 21:15:16 2024 +0800

[fix] [client] PIP-344 Do not create partitioned metadata when calling 
pulsarClient.getPartitionsForTopic(topicName) (#22206)

(cherry picked from commit 4e5c0bcc2b44c33a966287b86c2c235be249dc51)
---
 .../broker/admin/impl/PersistentTopicsBase.java|  26 +-
 .../apache/pulsar/broker/service/ServerCnx.java| 110 +++--
 .../broker/admin/GetPartitionMetadataTest.java | 473 +
 .../pulsar/broker/admin/TopicAutoCreationTest.java |   3 +-
 .../BrokerServiceAutoTopicCreationTest.java|   4 +-
 .../pulsar/broker/service/BrokerServiceTest.java   |  10 +-
 .../service/BrokerServiceThrottlingTest.java   |   2 +-
 .../pulsar/broker/service/ServerCnxTest.java   |   2 +-
 .../buffer/TransactionLowWaterMarkTest.java|   4 +-
 .../pulsar/client/api/BrokerServiceLookupTest.java |   2 +-
 .../org/apache/pulsar/client/api/PulsarClient.java |  23 +-
 .../client/impl/BinaryProtoLookupService.java  |  13 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java|   4 +-
 .../pulsar/client/impl/HttpLookupService.java  |   6 +-
 .../apache/pulsar/client/impl/LookupService.java   |  27 +-
 .../client/impl/MultiTopicsConsumerImpl.java   |   2 +-
 .../pulsar/client/impl/PulsarClientImpl.java   |  32 +-
 .../TransactionCoordinatorClientImpl.java  |   3 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  12 +-
 .../pulsar/client/impl/PulsarClientImplTest.java   |   3 +-
 .../apache/pulsar/common/protocol/Commands.java|   7 +-
 .../org/apache/pulsar/common/util/FutureUtil.java  |   4 +-
 pulsar-common/src/main/proto/PulsarApi.proto   |   2 +
 .../pulsar/proxy/server/LookupProxyHandler.java|   3 +-
 24 files changed, 689 insertions(+), 88 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ebb92679599..f9471b03c5f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -546,19 +546,29 @@ public class PersistentTopicsBase extends AdminResource {
   
boolean checkAllowAutoCreation) {
 return getPartitionedTopicMetadataAsync(topicName, authoritative, 
checkAllowAutoCreation)
 .thenCompose(metadata -> {
-CompletableFuture ret;
-if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+if (metadata.partitions > 1) {
+// Some clients does not support partitioned topic.
+return 
internalValidateClientVersionAsync().thenApply(__ -> metadata);
+} else if (metadata.partitions == 1) {
+return CompletableFuture.completedFuture(metadata);
+} else {
+// metadata.partitions == 0
 // The topic may be a non-partitioned topic, so check 
if it exists here.
 // However, when checkAllowAutoCreation is true, the 
client will create the topic if
 // it doesn't exist. In this case, `partitions == 0` 
means the automatically created topic
 // is a non-partitioned topic so we shouldn't check if 
the topic exists.
-ret = internalCheckTopicExists(topicName);
-} else if (metadata.partitions > 1) {
-ret = internalValidateClientVersionAsync();
-} else {
-ret = CompletableFuture.completedFuture(null);
+return 
pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
+.thenCompose(brokerAllowAutoTopicCreation -> {
+if (checkAllowAutoCreation) {
+// Whether it exists or not, auto create a 
non-partitioned topic by client.
+return 
CompletableFuture.completedFuture(metadata);
+} else {
+// If it does not exist, response a Not Found 
error.
+// Otherwise, response a non-partitioned 
metadata.
+return 
internalCheckTopicExists(topicName).thenApply(__ -> metadata);
+}
+});
 }
-return ret.thenApply(__ -> metadata);

(pulsar) 02/03: [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation (#22773)

2024-06-27 Thread yubiao
This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b5fdeff5444b2f7ca2da0dea229a1d1583df9d9d
Author: fengyubiao 
AuthorDate: Thu Jun 6 16:09:38 2024 +0800

[improve] [client] PIP-344 support feature flag 
supportsGetPartitionedMetadataWithoutAutoCreation (#22773)

(cherry picked from commit 6236116754472c61b2166da6d4797fc63c83f364)
---
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 44 ++
 .../client/impl/BinaryProtoLookupService.java  |  6 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  5 +++
 .../apache/pulsar/common/protocol/Commands.java|  1 +
 4 files changed, 56 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index dfd52d494ae..df6b1b8a8f9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -20,13 +20,17 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.collect.Sets;
 import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -124,4 +128,44 @@ public class ClientCnxTest extends 
MockedPulsarServiceBaseTest {
 producer.close();
 consumer.close();
 }
+
+@Test
+public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws 
Exception {
+final String topic = BrokerTestUtil.newUniqueName( "persistent://" + 
NAMESPACE + "/tp");
+admin.topics().createNonPartitionedTopic(topic);
+PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) 
PulsarClient.builder()
+.maxNumberOfRejectedRequestPerConnection(1)
+.connectionMaxIdleSeconds(Integer.MAX_VALUE)
+.serviceUrl(pulsar.getBrokerServiceUrl())
+.build();
+ProducerImpl producer = (ProducerImpl) 
clientWitBinaryLookup.newProducer().topic(topic).create();
+
+// Verify: the variable 
"isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker 
is true.
+Awaitility.await().untilAsserted(() -> {
+ClientCnx clientCnx = producer.getClientCnx();
+Assert.assertNotNull(clientCnx);
+
Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation());
+});
+Assert.assertEquals(
+clientWitBinaryLookup.getPartitionsForTopic(topic, 
true).get().size(), 1);
+
+// Inject a "false" value for the variable 
"isSupportsGetPartitionedMetadataWithoutAutoCreation".
+// Verify: client will get a not support error.
+Field field = 
ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation");
+field.setAccessible(true);
+for (CompletableFuture clientCnxFuture : 
clientWitBinaryLookup.getCnxPool().getConnections()) {
+field.set(clientCnxFuture.get(), false);
+}
+try {
+clientWitBinaryLookup.getPartitionsForTopic(topic, false).join();
+Assert.fail("Expected an error that the broker version is too 
old.");
+} catch (Exception ex) {
+Assert.assertTrue(ex.getMessage().contains("without auto-creation 
is not supported from the broker"));
+}
+
+// cleanup.
+producer.close();
+clientWitBinaryLookup.close();
+admin.topics().delete(topic, false);
+}
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index b363d6e4366..bf015c564b9 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -254,6 +254,12 @@ public class BinaryProtoLookupService implements 
LookupService {
 CompletableFuture partitionFuture = new 
CompletableFuture<>();
 
 client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx 
-> {
+if 

(pulsar) branch master updated: [feat][broker] PIP-264: Add transaction metrics (#22970)

2024-06-27 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4e535cb3f4a [feat][broker] PIP-264: Add transaction metrics (#22970)
4e535cb3f4a is described below

commit 4e535cb3f4a3482b0d5dc5a3a0a63c87490704e3
Author: Dragos Misca 
AuthorDate: Thu Jun 27 02:54:43 2024 -0700

[feat][broker] PIP-264: Add transaction metrics (#22970)
---
 .../org/apache/pulsar/broker/PulsarService.java| 15 
 .../broker/service/PersistentTopicAttributes.java  | 30 
 .../service/persistent/PersistentSubscription.java |  7 +-
 .../service/persistent/PersistentTopicMetrics.java | 14 +++-
 .../broker/stats/OpenTelemetryTopicStats.java  | 27 ++-
 .../OpenTelemetryTransactionCoordinatorStats.java  | 87 ++
 ...enTelemetryTransactionPendingAckStoreStats.java | 72 ++
 .../buffer/TransactionBufferClientStats.java   |  7 +-
 .../buffer/impl/TransactionBufferClientImpl.java   |  9 ++-
 .../impl/TransactionBufferClientStatsImpl.java | 61 +--
 .../transaction/pendingack/PendingAckHandle.java   |  7 ++
 .../pendingack/PendingAckHandleAttributes.java | 63 
 .../pendingack/PendingAckHandleStats.java  |  7 ++
 .../pendingack/impl/PendingAckHandleDisabled.java  |  6 ++
 .../pendingack/impl/PendingAckHandleImpl.java  | 28 ---
 .../pendingack/impl/PendingAckHandleStatsImpl.java | 56 +-
 .../pulsar/broker/transaction/TransactionTest.java | 24 +-
 .../buffer/TopicTransactionBufferTest.java | 22 +-
 .../pendingack/PendingAckPersistentTest.java   | 40 ++
 .../opentelemetry/OpenTelemetryAttributes.java | 33 +++-
 pulsar-transaction/coordinator/pom.xml |  6 ++
 .../coordinator/TransactionMetadataStore.java  |  9 +++
 .../TransactionMetadataStoreAttributes.java| 56 ++
 .../impl/InMemTransactionMetadataStore.java| 16 
 .../impl/MLTransactionMetadataStore.java   | 16 
 25 files changed, 640 insertions(+), 78 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 0d8bc571c57..848484fe376 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -116,6 +116,8 @@ import 
org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats;
 import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
+import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats;
+import 
org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats;
 import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
@@ -263,6 +265,8 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 private OpenTelemetryConsumerStats openTelemetryConsumerStats;
 private OpenTelemetryProducerStats openTelemetryProducerStats;
 private OpenTelemetryReplicatorStats openTelemetryReplicatorStats;
+private OpenTelemetryTransactionCoordinatorStats 
openTelemetryTransactionCoordinatorStats;
+private OpenTelemetryTransactionPendingAckStoreStats 
openTelemetryTransactionPendingAckStoreStats;
 
 private TransactionMetadataStoreService transactionMetadataStoreService;
 private TransactionBufferProvider transactionBufferProvider;
@@ -684,6 +688,14 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 brokerClientSharedTimer.stop();
 monotonicSnapshotClock.close();
 
+if (openTelemetryTransactionPendingAckStoreStats != null) {
+openTelemetryTransactionPendingAckStoreStats.close();
+openTelemetryTransactionPendingAckStoreStats = null;
+}
+if (openTelemetryTransactionCoordinatorStats != null) {
+openTelemetryTransactionCoordinatorStats.close();
+openTelemetryTransactionCoordinatorStats = null;
+}
 if (openTelemetryReplicatorStats != null) {
 openTelemetryReplicatorStats.close();
 openTelemetryReplicatorStats = null;
@@ -996,6 +1008,9 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
 
.newProvider(config.getTransactionBufferProviderClassName());
 transactionPendingAckStoreProvider = 
TransactionPendingAckStoreProvider
 

(pulsar) branch master updated: [improve] [pip] PIP-364: Introduce a new load balance algorithm AvgShedder (#22946)

2024-06-27 Thread kwang
This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 4ac9bc42f22 [improve] [pip] PIP-364: Introduce a new load balance 
algorithm AvgShedder (#22946)
4ac9bc42f22 is described below

commit 4ac9bc42f22f8163f59273a0b4ffc46cf3cffdea
Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com>
AuthorDate: Thu Jun 27 16:57:04 2024 +0800

[improve] [pip] PIP-364: Introduce a new load balance algorithm AvgShedder 
(#22946)
---
 pip/pip-364.md | 476 +
 1 file changed, 476 insertions(+)

diff --git a/pip/pip-364.md b/pip/pip-364.md
new file mode 100644
index 000..c589b3b47fc
--- /dev/null
+++ b/pip/pip-364.md
@@ -0,0 +1,476 @@
+
+# PIP-364: Introduce a new load balance algorithm AvgShedder
+
+# Background knowledge
+
+Pulsar has two load balance interfaces:
+- `LoadSheddingStrategy` is an unloading strategy that identifies high load 
brokers and unloads some of the bundles they carry to reduce the load.
+- `ModularLoadManagerStrategy` is a placement strategy responsible for 
assigning bundles to brokers.
+
+## LoadSheddingStrategy
+There are three available algorithms: `ThresholdShedder`, `OverloadShedder`, 
`UniformLoadShedder`.
+
+### ThresholdShedder
+`ThresholdShedder` uses the following method to calculate the maximum resource 
utilization rate for each broker,
+which includes CPU, direct memory, bandwidth in, and bandwidth out.
+```
+public double getMaxResourceUsageWithWeight(final double cpuWeight,
+final double 
directMemoryWeight, final double bandwidthInWeight,
+final double 
bandwidthOutWeight) {
+return max(cpu.percentUsage() * cpuWeight,
+directMemory.percentUsage() * directMemoryWeight, 
bandwidthIn.percentUsage() * bandwidthInWeight,
+bandwidthOut.percentUsage() * bandwidthOutWeight) / 100;
+}
+```
+
+After calculating the maximum resource utilization rate for each broker, a 
historical weight algorithm will
+also be executed to obtain the final score.
+```
+historyUsage = historyUsage == null ? resourceUsage : historyUsage * 
historyPercentage + (1 - historyPercentage) * resourceUsage;
+```
+The historyPercentage is determined by configuring the 
`loadBalancerHistoryResourcePercentage`. 
+The default value is 0.9, which means that the last calculated score accounts 
for 90%, 
+while the current calculated score only accounts for 10%.
+
+The introduction of this historical weight algorithm is to avoid bundle 
switching caused by
+short-term abnormal load increase or decrease, but in fact, this algorithm 
will introduce some
+serious problems, which will be explained in detail later.
+
+Next, calculate the average score of all brokers in the entire cluster: 
`avgUsage=totalUsage/totalBrokers`. 
+When the score of any broker exceeds a certain threshold of avgUsage, it is 
determined that the broker is overloaded.
+The threshold is determined by the configuration 
`loadBalancerBrokerThresholdShedderPercentage`, with a default value of 10.
+
+
+### OverloadShedder
+`OverloadShedder` use the same method `getMaxResourceUsageWithWeight` to 
calculate the maximum resource utilization rate for each broker.
+The difference is that `OverloadShedder` will not use the historical weight 
algorithm to calculate the final score, 
+the final score is the current maximum resource utilization rate of the broker.
+
+After obtaining the load score for each broker, compare it with the 
`loadBalancerBrokerOverloadedThresholdPercentage`. 
+If the threshold is exceeded, it is considered overloaded, with a default 
value of 85%.
+
+This algorithm is relatively simple, but there are many serious corner cases, 
so it is not recommended to use `OverloadShedder`.
+Here are two cases:
+- When the load on each broker in the cluster reaches the threshold, the 
bundle unload will continue to be executed,
+   but it will only switch from one overloaded broker to another, which is 
meaningless.
+- If there are no broker whose load reaches the threshold, adding new brokers 
will not balance the traffic to the new added brokers.
+The impact of these two points is quite serious, so we won't talk about it 
next.
+
+
+### UniformLoadShedder
+`UniformLoadShedder` will first calculate the maximum and minimum message 
rates, as well as the maximum and minimum 
+traffic throughput and corresponding broker. Then calculate the maximum and 
minimum difference, with two thresholds 
+corresponding to message rate and throughput size, respectively.
+
+- loadBalancerMsgRateDifferenceShedderThreshold
+
+The message rate percentage threshold between the highest and lowest loaded 
brokers, with a default value of 50,
+can trigger bundle unload