(pulsar) branch branch-3.0 updated: [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new ab8dba3370f [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856) ab8dba3370f is described below commit ab8dba3370f0ce02bcef2bc6ae5295ffa874a7df Author: fengyubiao AuthorDate: Wed Jun 19 22:29:17 2024 +0800 [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856) (cherry picked from commit 5fc0eafab9ea2a4ece7b87218404489c270b64e6) --- .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++ .../broker/service/persistent/PersistentTopic.java | 9 +- .../broker/service/OneWayReplicatorTest.java | 103 - .../service/OneWayReplicatorUsingGlobalZKTest.java | 52 +++ 4 files changed, 167 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index ff33d9103ab..0f33be168cf 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1329,6 +1329,12 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Max number of snapshot to be cached per subscription.") private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10; +@FieldContext( +category = CATEGORY_SERVER, +dynamic = true, +doc = "The position that replication task start at, it can be set to earliest or latest (default).") +private String replicationStartAt = "latest"; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index df16883b5c8..3eb7648614f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1905,7 +1905,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal final CompletableFuture future = new CompletableFuture<>(); String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); -ledger.asyncOpenCursor(name, new OpenCursorCallback() { +final InitialPosition initialPosition; +if (MessageId.earliest.toString() + .equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) { +initialPosition = InitialPosition.Earliest; +} else { +initialPosition = InitialPosition.Latest; +} +ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 682ec55ae40..f0f699a7e16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -34,6 +35,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -68,11 +70,12 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.naming.TopicName; import
(pulsar) branch branch-3.3 updated: [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 4399b2743cd [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856) 4399b2743cd is described below commit 4399b2743cdc01070a871b32f7bc02ac736e9c80 Author: fengyubiao AuthorDate: Wed Jun 19 22:29:17 2024 +0800 [improve] [broker] PIP-356 Support Geo-Replication starts at earliest position (#22856) (cherry picked from commit 5fc0eafab9ea2a4ece7b87218404489c270b64e6) --- .../apache/pulsar/broker/ServiceConfiguration.java | 6 ++ .../broker/service/persistent/PersistentTopic.java | 9 +- .../broker/service/OneWayReplicatorTest.java | 103 - .../service/OneWayReplicatorUsingGlobalZKTest.java | 52 +++ 4 files changed, 167 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 9efe1856509..11d1d663f42 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1345,6 +1345,12 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Max number of snapshot to be cached per subscription.") private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10; +@FieldContext( +category = CATEGORY_SERVER, +dynamic = true, +doc = "The position that replication task start at, it can be set to earliest or latest (default).") +private String replicationStartAt = "latest"; + @FieldContext( category = CATEGORY_SERVER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3331dcb53b3..dbbb0b07ce3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2062,7 +2062,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal final CompletableFuture future = new CompletableFuture<>(); String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster); -ledger.asyncOpenCursor(name, new OpenCursorCallback() { +final InitialPosition initialPosition; +if (MessageId.earliest.toString() + .equalsIgnoreCase(getBrokerService().getPulsar().getConfiguration().getReplicationStartAt())) { +initialPosition = InitialPosition.Earliest; +} else { +initialPosition = InitialPosition.Latest; +} +ledger.asyncOpenCursor(name, initialPosition, new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index b751d269d1f..5eb1385d85b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -36,6 +37,7 @@ import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.Optional; import java.util.UUID; @@ -71,11 +73,12 @@ import org.apache.pulsar.client.impl.ProducerBuilderImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.common.naming.TopicName; import
(pulsar) 03/03: [fix] [broker] response not-found error if topic does not exist when calling getPartitionedTopicMetadata (#22838)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit 72705984a9cd5bad9ceb083258c37a7ec83c6987 Author: fengyubiao AuthorDate: Mon Jun 17 23:39:08 2024 +0800 [fix] [broker] response not-found error if topic does not exist when calling getPartitionedTopicMetadata (#22838) (cherry picked from commit 9aed73653e1f706e3517072cce4a352d0838f8d7) --- .../broker/admin/impl/PersistentTopicsBase.java| 21 +- .../broker/admin/v2/NonPersistentTopics.java | 16 +- .../pulsar/broker/lookup/TopicLookupBase.java | 22 +- .../pulsar/broker/namespace/NamespaceService.java | 101 +++- .../pulsar/broker/namespace/TopicExistsInfo.java | 82 +++ .../pulsar/broker/service/BrokerService.java | 117 ++-- .../apache/pulsar/broker/service/ServerCnx.java| 81 +-- .../admin/GetPartitionMetadataMultiBrokerTest.java | 222 .../broker/admin/GetPartitionMetadataTest.java | 608 +++-- .../org/apache/pulsar/broker/admin/TopicsTest.java | 13 +- .../broker/lookup/http/HttpTopicLookupv2Test.java | 19 +- .../broker/namespace/NamespaceServiceTest.java | 7 +- .../apache/pulsar/broker/service/TopicGCTest.java | 2 + .../pulsar/client/impl/ConsumerBuilderImpl.java| 37 +- 14 files changed, 899 insertions(+), 449 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f9471b03c5f..887591c0feb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -559,13 +559,13 @@ public class PersistentTopicsBase extends AdminResource { // is a non-partitioned topic so we shouldn't check if the topic exists. return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) .thenCompose(brokerAllowAutoTopicCreation -> { -if (checkAllowAutoCreation) { +if (checkAllowAutoCreation && brokerAllowAutoTopicCreation) { // Whether it exists or not, auto create a non-partitioned topic by client. return CompletableFuture.completedFuture(metadata); } else { // If it does not exist, response a Not Found error. // Otherwise, response a non-partitioned metadata. -return internalCheckTopicExists(topicName).thenApply(__ -> metadata); +return internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata); } }); } @@ -713,6 +713,17 @@ public class PersistentTopicsBase extends AdminResource { protected CompletableFuture internalCheckTopicExists(TopicName topicName) { return pulsar().getNamespaceService().checkTopicExists(topicName) +.thenAccept(info -> { +boolean exists = info.isExists(); +info.recycle(); +if (!exists) { +throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); +} +}); +} + +protected CompletableFuture internalCheckNonPartitionedTopicExists(TopicName topicName) { +return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName) .thenAccept(exist -> { if (!exist) { throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString())); @@ -5336,8 +5347,10 @@ public class PersistentTopicsBase extends AdminResource { "Only persistent topic can be set as shadow topic")); } futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName) -.thenAccept(isExists -> { -if (!isExists) { +.thenAccept(info -> { +boolean exists = info.isExists(); +info.recycle(); +if (!exists) { throw new RestException(Status.PRECONDITION_FAILED, "Shadow topic [" + shadowTopic + "] not exists."); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
(pulsar) branch branch-3.3 updated (7d81ee9d7de -> 72705984a9c)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a change to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git from 7d81ee9d7de [fix][broker] Ensure that PulsarService is ready for serving incoming requests (#22977) new a927ef43935 [fix] [client] PIP-344 Do not create partitioned metadata when calling pulsarClient.getPartitionsForTopic(topicName) (#22206) new b5fdeff5444 [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation (#22773) new 72705984a9c [fix] [broker] response not-found error if topic does not exist when calling getPartitionedTopicMetadata (#22838) The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../broker/admin/impl/PersistentTopicsBase.java| 43 +- .../broker/admin/v2/NonPersistentTopics.java | 16 +- .../pulsar/broker/lookup/TopicLookupBase.java | 22 +- .../pulsar/broker/namespace/NamespaceService.java | 101 +++- .../pulsar/broker/namespace/TopicExistsInfo.java | 82 .../pulsar/broker/service/BrokerService.java | 117 ++--- .../apache/pulsar/broker/service/ServerCnx.java| 83 +++- .../admin/GetPartitionMetadataMultiBrokerTest.java | 222 + .../broker/admin/GetPartitionMetadataTest.java | 523 + .../pulsar/broker/admin/TopicAutoCreationTest.java | 3 +- .../org/apache/pulsar/broker/admin/TopicsTest.java | 13 +- .../broker/lookup/http/HttpTopicLookupv2Test.java | 19 +- .../broker/namespace/NamespaceServiceTest.java | 7 +- .../BrokerServiceAutoTopicCreationTest.java| 4 +- .../pulsar/broker/service/BrokerServiceTest.java | 10 +- .../service/BrokerServiceThrottlingTest.java | 2 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../apache/pulsar/broker/service/TopicGCTest.java | 2 + .../buffer/TransactionLowWaterMarkTest.java| 4 +- .../pulsar/client/api/BrokerServiceLookupTest.java | 2 +- .../apache/pulsar/client/impl/ClientCnxTest.java | 44 ++ .../org/apache/pulsar/client/api/PulsarClient.java | 23 +- .../client/impl/BinaryProtoLookupService.java | 19 +- .../org/apache/pulsar/client/impl/ClientCnx.java | 5 + .../pulsar/client/impl/ConsumerBuilderImpl.java| 37 +- .../pulsar/client/impl/HttpLookupService.java | 6 +- .../apache/pulsar/client/impl/LookupService.java | 27 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 32 +- .../TransactionCoordinatorClientImpl.java | 3 +- .../client/impl/MultiTopicsConsumerImplTest.java | 12 +- .../pulsar/client/impl/PulsarClientImplTest.java | 3 +- .../apache/pulsar/common/protocol/Commands.java| 8 +- .../org/apache/pulsar/common/util/FutureUtil.java | 4 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + .../pulsar/proxy/server/LookupProxyHandler.java| 3 +- 36 files changed, 1307 insertions(+), 200 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/TopicExistsInfo.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataMultiBrokerTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/GetPartitionMetadataTest.java
(pulsar) 01/03: [fix] [client] PIP-344 Do not create partitioned metadata when calling pulsarClient.getPartitionsForTopic(topicName) (#22206)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit a927ef43935798d573c23910af58f5f1dd1e916a Author: fengyubiao AuthorDate: Thu May 23 21:15:16 2024 +0800 [fix] [client] PIP-344 Do not create partitioned metadata when calling pulsarClient.getPartitionsForTopic(topicName) (#22206) (cherry picked from commit 4e5c0bcc2b44c33a966287b86c2c235be249dc51) --- .../broker/admin/impl/PersistentTopicsBase.java| 26 +- .../apache/pulsar/broker/service/ServerCnx.java| 110 +++-- .../broker/admin/GetPartitionMetadataTest.java | 473 + .../pulsar/broker/admin/TopicAutoCreationTest.java | 3 +- .../BrokerServiceAutoTopicCreationTest.java| 4 +- .../pulsar/broker/service/BrokerServiceTest.java | 10 +- .../service/BrokerServiceThrottlingTest.java | 2 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../buffer/TransactionLowWaterMarkTest.java| 4 +- .../pulsar/client/api/BrokerServiceLookupTest.java | 2 +- .../org/apache/pulsar/client/api/PulsarClient.java | 23 +- .../client/impl/BinaryProtoLookupService.java | 13 +- .../pulsar/client/impl/ConsumerBuilderImpl.java| 4 +- .../pulsar/client/impl/HttpLookupService.java | 6 +- .../apache/pulsar/client/impl/LookupService.java | 27 +- .../client/impl/MultiTopicsConsumerImpl.java | 2 +- .../pulsar/client/impl/PulsarClientImpl.java | 32 +- .../TransactionCoordinatorClientImpl.java | 3 +- .../client/impl/MultiTopicsConsumerImplTest.java | 12 +- .../pulsar/client/impl/PulsarClientImplTest.java | 3 +- .../apache/pulsar/common/protocol/Commands.java| 7 +- .../org/apache/pulsar/common/util/FutureUtil.java | 4 +- pulsar-common/src/main/proto/PulsarApi.proto | 2 + .../pulsar/proxy/server/LookupProxyHandler.java| 3 +- 24 files changed, 689 insertions(+), 88 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ebb92679599..f9471b03c5f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -546,19 +546,29 @@ public class PersistentTopicsBase extends AdminResource { boolean checkAllowAutoCreation) { return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation) .thenCompose(metadata -> { -CompletableFuture ret; -if (metadata.partitions == 0 && !checkAllowAutoCreation) { +if (metadata.partitions > 1) { +// Some clients does not support partitioned topic. +return internalValidateClientVersionAsync().thenApply(__ -> metadata); +} else if (metadata.partitions == 1) { +return CompletableFuture.completedFuture(metadata); +} else { +// metadata.partitions == 0 // The topic may be a non-partitioned topic, so check if it exists here. // However, when checkAllowAutoCreation is true, the client will create the topic if // it doesn't exist. In this case, `partitions == 0` means the automatically created topic // is a non-partitioned topic so we shouldn't check if the topic exists. -ret = internalCheckTopicExists(topicName); -} else if (metadata.partitions > 1) { -ret = internalValidateClientVersionAsync(); -} else { -ret = CompletableFuture.completedFuture(null); +return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName) +.thenCompose(brokerAllowAutoTopicCreation -> { +if (checkAllowAutoCreation) { +// Whether it exists or not, auto create a non-partitioned topic by client. +return CompletableFuture.completedFuture(metadata); +} else { +// If it does not exist, response a Not Found error. +// Otherwise, response a non-partitioned metadata. +return internalCheckTopicExists(topicName).thenApply(__ -> metadata); +} +}); } -return ret.thenApply(__ -> metadata);
(pulsar) 02/03: [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation (#22773)
This is an automated email from the ASF dual-hosted git repository. yubiao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git commit b5fdeff5444b2f7ca2da0dea229a1d1583df9d9d Author: fengyubiao AuthorDate: Thu Jun 6 16:09:38 2024 +0800 [improve] [client] PIP-344 support feature flag supportsGetPartitionedMetadataWithoutAutoCreation (#22773) (cherry picked from commit 6236116754472c61b2166da6d4797fc63c83f364) --- .../apache/pulsar/client/impl/ClientCnxTest.java | 44 ++ .../client/impl/BinaryProtoLookupService.java | 6 +++ .../org/apache/pulsar/client/impl/ClientCnx.java | 5 +++ .../apache/pulsar/common/protocol/Commands.java| 1 + 4 files changed, 56 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java index dfd52d494ae..df6b1b8a8f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java @@ -20,13 +20,17 @@ package org.apache.pulsar.client.impl; import com.google.common.collect.Sets; import io.netty.channel.ChannelHandlerContext; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; @@ -124,4 +128,44 @@ public class ClientCnxTest extends MockedPulsarServiceBaseTest { producer.close(); consumer.close(); } + +@Test +public void testSupportsGetPartitionedMetadataWithoutAutoCreation() throws Exception { +final String topic = BrokerTestUtil.newUniqueName( "persistent://" + NAMESPACE + "/tp"); +admin.topics().createNonPartitionedTopic(topic); +PulsarClientImpl clientWitBinaryLookup = (PulsarClientImpl) PulsarClient.builder() +.maxNumberOfRejectedRequestPerConnection(1) +.connectionMaxIdleSeconds(Integer.MAX_VALUE) +.serviceUrl(pulsar.getBrokerServiceUrl()) +.build(); +ProducerImpl producer = (ProducerImpl) clientWitBinaryLookup.newProducer().topic(topic).create(); + +// Verify: the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation" responded from the broker is true. +Awaitility.await().untilAsserted(() -> { +ClientCnx clientCnx = producer.getClientCnx(); +Assert.assertNotNull(clientCnx); + Assert.assertTrue(clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()); +}); +Assert.assertEquals( +clientWitBinaryLookup.getPartitionsForTopic(topic, true).get().size(), 1); + +// Inject a "false" value for the variable "isSupportsGetPartitionedMetadataWithoutAutoCreation". +// Verify: client will get a not support error. +Field field = ClientCnx.class.getDeclaredField("supportsGetPartitionedMetadataWithoutAutoCreation"); +field.setAccessible(true); +for (CompletableFuture clientCnxFuture : clientWitBinaryLookup.getCnxPool().getConnections()) { +field.set(clientCnxFuture.get(), false); +} +try { +clientWitBinaryLookup.getPartitionsForTopic(topic, false).join(); +Assert.fail("Expected an error that the broker version is too old."); +} catch (Exception ex) { +Assert.assertTrue(ex.getMessage().contains("without auto-creation is not supported from the broker")); +} + +// cleanup. +producer.close(); +clientWitBinaryLookup.close(); +admin.topics().delete(topic, false); +} } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index b363d6e4366..bf015c564b9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -254,6 +254,12 @@ public class BinaryProtoLookupService implements LookupService { CompletableFuture partitionFuture = new CompletableFuture<>(); client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { +if
(pulsar) branch master updated: [feat][broker] PIP-264: Add transaction metrics (#22970)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4e535cb3f4a [feat][broker] PIP-264: Add transaction metrics (#22970) 4e535cb3f4a is described below commit 4e535cb3f4a3482b0d5dc5a3a0a63c87490704e3 Author: Dragos Misca AuthorDate: Thu Jun 27 02:54:43 2024 -0700 [feat][broker] PIP-264: Add transaction metrics (#22970) --- .../org/apache/pulsar/broker/PulsarService.java| 15 .../broker/service/PersistentTopicAttributes.java | 30 .../service/persistent/PersistentSubscription.java | 7 +- .../service/persistent/PersistentTopicMetrics.java | 14 +++- .../broker/stats/OpenTelemetryTopicStats.java | 27 ++- .../OpenTelemetryTransactionCoordinatorStats.java | 87 ++ ...enTelemetryTransactionPendingAckStoreStats.java | 72 ++ .../buffer/TransactionBufferClientStats.java | 7 +- .../buffer/impl/TransactionBufferClientImpl.java | 9 ++- .../impl/TransactionBufferClientStatsImpl.java | 61 +-- .../transaction/pendingack/PendingAckHandle.java | 7 ++ .../pendingack/PendingAckHandleAttributes.java | 63 .../pendingack/PendingAckHandleStats.java | 7 ++ .../pendingack/impl/PendingAckHandleDisabled.java | 6 ++ .../pendingack/impl/PendingAckHandleImpl.java | 28 --- .../pendingack/impl/PendingAckHandleStatsImpl.java | 56 +- .../pulsar/broker/transaction/TransactionTest.java | 24 +- .../buffer/TopicTransactionBufferTest.java | 22 +- .../pendingack/PendingAckPersistentTest.java | 40 ++ .../opentelemetry/OpenTelemetryAttributes.java | 33 +++- pulsar-transaction/coordinator/pom.xml | 6 ++ .../coordinator/TransactionMetadataStore.java | 9 +++ .../TransactionMetadataStoreAttributes.java| 56 ++ .../impl/InMemTransactionMetadataStore.java| 16 .../impl/MLTransactionMetadataStore.java | 16 25 files changed, 640 insertions(+), 78 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0d8bc571c57..848484fe376 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -116,6 +116,8 @@ import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats; import org.apache.pulsar.broker.stats.OpenTelemetryProducerStats; import org.apache.pulsar.broker.stats.OpenTelemetryReplicatorStats; import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; +import org.apache.pulsar.broker.stats.OpenTelemetryTransactionCoordinatorStats; +import org.apache.pulsar.broker.stats.OpenTelemetryTransactionPendingAckStoreStats; import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; @@ -263,6 +265,8 @@ public class PulsarService implements AutoCloseable, ShutdownService { private OpenTelemetryConsumerStats openTelemetryConsumerStats; private OpenTelemetryProducerStats openTelemetryProducerStats; private OpenTelemetryReplicatorStats openTelemetryReplicatorStats; +private OpenTelemetryTransactionCoordinatorStats openTelemetryTransactionCoordinatorStats; +private OpenTelemetryTransactionPendingAckStoreStats openTelemetryTransactionPendingAckStoreStats; private TransactionMetadataStoreService transactionMetadataStoreService; private TransactionBufferProvider transactionBufferProvider; @@ -684,6 +688,14 @@ public class PulsarService implements AutoCloseable, ShutdownService { brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); +if (openTelemetryTransactionPendingAckStoreStats != null) { +openTelemetryTransactionPendingAckStoreStats.close(); +openTelemetryTransactionPendingAckStoreStats = null; +} +if (openTelemetryTransactionCoordinatorStats != null) { +openTelemetryTransactionCoordinatorStats.close(); +openTelemetryTransactionCoordinatorStats = null; +} if (openTelemetryReplicatorStats != null) { openTelemetryReplicatorStats.close(); openTelemetryReplicatorStats = null; @@ -996,6 +1008,9 @@ public class PulsarService implements AutoCloseable, ShutdownService { .newProvider(config.getTransactionBufferProviderClassName()); transactionPendingAckStoreProvider = TransactionPendingAckStoreProvider
(pulsar) branch master updated: [improve] [pip] PIP-364: Introduce a new load balance algorithm AvgShedder (#22946)
This is an automated email from the ASF dual-hosted git repository. kwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 4ac9bc42f22 [improve] [pip] PIP-364: Introduce a new load balance algorithm AvgShedder (#22946) 4ac9bc42f22 is described below commit 4ac9bc42f22f8163f59273a0b4ffc46cf3cffdea Author: Wenzhi Feng <52550727+thetumb...@users.noreply.github.com> AuthorDate: Thu Jun 27 16:57:04 2024 +0800 [improve] [pip] PIP-364: Introduce a new load balance algorithm AvgShedder (#22946) --- pip/pip-364.md | 476 + 1 file changed, 476 insertions(+) diff --git a/pip/pip-364.md b/pip/pip-364.md new file mode 100644 index 000..c589b3b47fc --- /dev/null +++ b/pip/pip-364.md @@ -0,0 +1,476 @@ + +# PIP-364: Introduce a new load balance algorithm AvgShedder + +# Background knowledge + +Pulsar has two load balance interfaces: +- `LoadSheddingStrategy` is an unloading strategy that identifies high load brokers and unloads some of the bundles they carry to reduce the load. +- `ModularLoadManagerStrategy` is a placement strategy responsible for assigning bundles to brokers. + +## LoadSheddingStrategy +There are three available algorithms: `ThresholdShedder`, `OverloadShedder`, `UniformLoadShedder`. + +### ThresholdShedder +`ThresholdShedder` uses the following method to calculate the maximum resource utilization rate for each broker, +which includes CPU, direct memory, bandwidth in, and bandwidth out. +``` +public double getMaxResourceUsageWithWeight(final double cpuWeight, +final double directMemoryWeight, final double bandwidthInWeight, +final double bandwidthOutWeight) { +return max(cpu.percentUsage() * cpuWeight, +directMemory.percentUsage() * directMemoryWeight, bandwidthIn.percentUsage() * bandwidthInWeight, +bandwidthOut.percentUsage() * bandwidthOutWeight) / 100; +} +``` + +After calculating the maximum resource utilization rate for each broker, a historical weight algorithm will +also be executed to obtain the final score. +``` +historyUsage = historyUsage == null ? resourceUsage : historyUsage * historyPercentage + (1 - historyPercentage) * resourceUsage; +``` +The historyPercentage is determined by configuring the `loadBalancerHistoryResourcePercentage`. +The default value is 0.9, which means that the last calculated score accounts for 90%, +while the current calculated score only accounts for 10%. + +The introduction of this historical weight algorithm is to avoid bundle switching caused by +short-term abnormal load increase or decrease, but in fact, this algorithm will introduce some +serious problems, which will be explained in detail later. + +Next, calculate the average score of all brokers in the entire cluster: `avgUsage=totalUsage/totalBrokers`. +When the score of any broker exceeds a certain threshold of avgUsage, it is determined that the broker is overloaded. +The threshold is determined by the configuration `loadBalancerBrokerThresholdShedderPercentage`, with a default value of 10. + + +### OverloadShedder +`OverloadShedder` use the same method `getMaxResourceUsageWithWeight` to calculate the maximum resource utilization rate for each broker. +The difference is that `OverloadShedder` will not use the historical weight algorithm to calculate the final score, +the final score is the current maximum resource utilization rate of the broker. + +After obtaining the load score for each broker, compare it with the `loadBalancerBrokerOverloadedThresholdPercentage`. +If the threshold is exceeded, it is considered overloaded, with a default value of 85%. + +This algorithm is relatively simple, but there are many serious corner cases, so it is not recommended to use `OverloadShedder`. +Here are two cases: +- When the load on each broker in the cluster reaches the threshold, the bundle unload will continue to be executed, + but it will only switch from one overloaded broker to another, which is meaningless. +- If there are no broker whose load reaches the threshold, adding new brokers will not balance the traffic to the new added brokers. +The impact of these two points is quite serious, so we won't talk about it next. + + +### UniformLoadShedder +`UniformLoadShedder` will first calculate the maximum and minimum message rates, as well as the maximum and minimum +traffic throughput and corresponding broker. Then calculate the maximum and minimum difference, with two thresholds +corresponding to message rate and throughput size, respectively. + +- loadBalancerMsgRateDifferenceShedderThreshold + +The message rate percentage threshold between the highest and lowest loaded brokers, with a default value of 50, +can trigger bundle unload