This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6372b9c8f54 [fix] [broker] fix deadlock when disable topic level
Geo-Replication (#22738)
6372b9c8f54 is described below
commit 6372b9c8f5448757cdb56dc70aaf0ebb52a1063e
Author: fengyubiao <[email protected]>
AuthorDate: Sun May 19 11:28:21 2024 +0800
[fix] [broker] fix deadlock when disable topic level Geo-Replication
(#22738)
---
.../broker/admin/impl/PersistentTopicsBase.java | 6 +-
.../broker/service/OneWayReplicatorTest.java | 59 +++++++
.../broker/service/OneWayReplicatorTestBase.java | 172 +++++++++++++++++----
.../service/OneWayReplicatorUsingGlobalZKTest.java | 97 ++++++++++++
4 files changed, 306 insertions(+), 28 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 682f41dcdb6..924b7be0855 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3262,12 +3262,14 @@ public class PersistentTopicsBase extends AdminResource
{
}
Set<String> replicationClusters = Sets.newHashSet(clusterIds);
return validatePoliciesReadOnlyAccessAsync()
- .thenCompose(__ -> {
+ .thenAccept(__ -> {
if (replicationClusters.contains("global")) {
throw new RestException(Status.PRECONDITION_FAILED,
"Cannot specify global in the list of
replication clusters");
}
- Set<String> clusters = clusters();
+ })
+ .thenCompose(__ -> clustersAsync())
+ .thenCompose(clusters -> {
List<CompletableFuture<Void>> futures = new
ArrayList<>(replicationClusters.size());
for (String clusterId : replicationClusters) {
if (!clusters.contains(clusterId)) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
index fae72e8eac2..a5f1339e95f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java
@@ -689,4 +689,63 @@ public class OneWayReplicatorTest extends
OneWayReplicatorTestBase {
admin2.topics().delete(topicName);
});
}
+
+ @Test
+ public void testDeleteNonPartitionedTopic() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ admin1.topics().createNonPartitionedTopic(topicName);
+
+ // Verify replicator works.
+ verifyReplicationWorks(topicName);
+
+ // Disable replication.
+ setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1,
pulsar1);
+ setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2,
pulsar2);
+
+ // Delete topic.
+ admin1.topics().delete(topicName);
+ admin2.topics().delete(topicName);
+
+ // Verify the topic was deleted.
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(topicName)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+ .persistentTopicExists(TopicName.get(topicName)).join());
+ }
+
+ @Test
+ public void testDeletePartitionedTopic() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ replicatedNamespace + "/tp_");
+ admin1.topics().createPartitionedTopic(topicName, 2);
+
+ // Verify replicator works.
+ verifyReplicationWorks(topicName);
+
+ // Disable replication.
+ setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1,
pulsar1);
+ setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2,
pulsar2);
+
+ // Delete topic.
+ admin1.topics().deletePartitionedTopic(topicName);
+ if (!usingGlobalZK) {
+ admin2.topics().deletePartitionedTopic(topicName);
+ }
+
+ // Verify the topic was deleted.
+
assertFalse(pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExists(TopicName.get(topicName)));
+
assertFalse(pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+ .partitionedTopicExists(TopicName.get(topicName)));
+ if (!usingGlobalZK) {
+ // So far, the topic partitions on the remote cluster are needed
to delete manually when using global ZK.
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(0)).join());
+ assertFalse(pulsar1.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+ assertFalse(pulsar2.getPulsarResources().getTopicResources()
+
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
index 317e43306e3..6a84432890c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java
@@ -21,21 +21,33 @@ package org.apache.pulsar.broker.service;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.tests.TestRetrySupport;
@@ -52,6 +64,9 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
protected final String nonReplicatedNamespace = defaultTenant + "/ns1";
protected final String cluster1 = "r1";
+
+ protected boolean usingGlobalZK = false;
+
protected URL url1;
protected URL urlTls1;
protected ServiceConfiguration config1 = new ServiceConfiguration();
@@ -77,8 +92,12 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
// Start ZK.
brokerConfigZk1 = new ZookeeperServerTest(0);
brokerConfigZk1.start();
- brokerConfigZk2 = new ZookeeperServerTest(0);
- brokerConfigZk2.start();
+ if (usingGlobalZK) {
+ brokerConfigZk2 = brokerConfigZk1;
+ } else {
+ brokerConfigZk2 = new ZookeeperServerTest(0);
+ brokerConfigZk2.start();
+ }
// Start BK.
bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
@@ -132,30 +151,32 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
- admin2.clusters().createCluster(cluster1, ClusterData.builder()
- .serviceUrl(url1.toString())
- .serviceUrlTls(urlTls1.toString())
- .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
- .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
- .brokerClientTlsEnabled(false)
- .build());
- admin2.clusters().createCluster(cluster2, ClusterData.builder()
- .serviceUrl(url2.toString())
- .serviceUrlTls(urlTls2.toString())
- .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
- .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
- .brokerClientTlsEnabled(false)
- .build());
-
admin1.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));
- admin2.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
- Sets.newHashSet(cluster1, cluster2)));
-
admin1.namespaces().createNamespace(replicatedNamespace,
Sets.newHashSet(cluster1, cluster2));
- admin2.namespaces().createNamespace(replicatedNamespace);
admin1.namespaces().createNamespace(nonReplicatedNamespace);
- admin2.namespaces().createNamespace(nonReplicatedNamespace);
+
+ if (!usingGlobalZK) {
+ admin2.clusters().createCluster(cluster1, ClusterData.builder()
+ .serviceUrl(url1.toString())
+ .serviceUrlTls(urlTls1.toString())
+ .brokerServiceUrl(pulsar1.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin2.clusters().createCluster(cluster2, ClusterData.builder()
+ .serviceUrl(url2.toString())
+ .serviceUrlTls(urlTls2.toString())
+ .brokerServiceUrl(pulsar2.getBrokerServiceUrl())
+ .brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
+ .brokerClientTlsEnabled(false)
+ .build());
+ admin2.tenants().createTenant(defaultTenant, new
TenantInfoImpl(Collections.emptySet(),
+ Sets.newHashSet(cluster1, cluster2)));
+ admin2.namespaces().createNamespace(replicatedNamespace);
+ admin2.namespaces().createNamespace(nonReplicatedNamespace);
+ }
+
}
protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws
Exception {
@@ -163,6 +184,9 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
}
protected void cleanupTopics(String namespace, CleanupTopicAction
cleanupTopicAction) throws Exception {
+ if (usingGlobalZK) {
+ throw new IllegalArgumentException("The method cleanupTopics does
not support for global ZK");
+ }
waitChangeEventsInit(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace,
Collections.singleton(cluster1));
admin1.namespaces().unload(namespace);
@@ -242,11 +266,15 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
// delete namespaces.
waitChangeEventsInit(replicatedNamespace);
admin1.namespaces().setNamespaceReplicationClusters(replicatedNamespace,
Sets.newHashSet(cluster1));
+ if (!usingGlobalZK) {
+
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace,
Sets.newHashSet(cluster2));
+ }
admin1.namespaces().deleteNamespace(replicatedNamespace);
-
admin2.namespaces().setNamespaceReplicationClusters(replicatedNamespace,
Sets.newHashSet(cluster2));
- admin2.namespaces().deleteNamespace(replicatedNamespace);
admin1.namespaces().deleteNamespace(nonReplicatedNamespace);
- admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
+ if (!usingGlobalZK) {
+ admin2.namespaces().deleteNamespace(replicatedNamespace);
+ admin2.namespaces().deleteNamespace(nonReplicatedNamespace);
+ }
// shutdown.
markCurrentSetupNumberCleaned();
@@ -291,7 +319,7 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
brokerConfigZk1.stop();
brokerConfigZk1 = null;
}
- if (brokerConfigZk2 != null) {
+ if (!usingGlobalZK && brokerConfigZk2 != null) {
brokerConfigZk2.stop();
brokerConfigZk2 = null;
}
@@ -313,4 +341,96 @@ public abstract class OneWayReplicatorTestBase extends
TestRetrySupport {
protected PulsarClient initClient(ClientBuilder clientBuilder) throws
Exception {
return clientBuilder.build();
}
+
+ protected void verifyReplicationWorks(String topic) throws Exception {
+ final String subscription = "__subscribe_1";
+ final String msgValue = "__msg1";
+ Producer<String> producer1 =
client1.newProducer(Schema.STRING).topic(topic).create();
+ Consumer<String> consumer2 =
client2.newConsumer(Schema.STRING).topic(topic).isAckReceiptEnabled(true)
+ .subscriptionName(subscription).subscribe();
+ producer1.newMessage().value(msgValue).send();
+ pulsar1.getBrokerService().checkReplicationPolicies();
+ assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(),
msgValue);
+ consumer2.unsubscribe();
+ producer1.close();
+ }
+
+ protected void setTopicLevelClusters(String topic, List<String> clusters,
PulsarAdmin admin,
+ PulsarService pulsar) throws
Exception {
+ Set<String> expected = new HashSet<>(clusters);
+ TopicName topicName =
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+ int partitions = ensurePartitionsAreSame(topic);
+ admin.topics().setReplicationClusters(topic, clusters);
+ Awaitility.await().untilAsserted(() -> {
+ TopicPolicies policies =
pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
+ assertEquals(new HashSet<>(policies.getReplicationClusters()),
expected);
+ if (partitions == 0) {
+ checkNonPartitionedTopicLevelClusters(topicName.toString(),
clusters, admin, pulsar.getBrokerService());
+ } else {
+ for (int i = 0; i < partitions; i++) {
+
checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(),
clusters, admin,
+ pulsar.getBrokerService());
+ }
+ }
+ });
+ }
+
+ protected void checkNonPartitionedTopicLevelClusters(String topic,
List<String> clusters, PulsarAdmin admin,
+ BrokerService broker) throws
Exception {
+ CompletableFuture<Optional<Topic>> future = broker.getTopic(topic,
false);
+ if (future == null) {
+ return;
+ }
+ Optional<Topic> optional = future.join();
+ if (optional == null || !optional.isPresent()) {
+ return;
+ }
+ PersistentTopic persistentTopic = (PersistentTopic) optional.get();
+ Set<String> expected = new HashSet<>(clusters);
+ Set<String> act = new
HashSet<>(persistentTopic.getTopicPolicies().get().getReplicationClusters());
+ assertEquals(act, expected);
+ }
+
+ protected int ensurePartitionsAreSame(String topic) throws Exception {
+ TopicName topicName =
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+ boolean isPartitionedTopic1 =
pulsar1.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().partitionedTopicExists(topicName);
+ boolean isPartitionedTopic2 =
pulsar2.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().partitionedTopicExists(topicName);
+ if (isPartitionedTopic1 != isPartitionedTopic2) {
+ throw new IllegalArgumentException(String.format("Can not delete
topic."
+ + " isPartitionedTopic1: %s, isPartitionedTopic2:
%s",
+ isPartitionedTopic1, isPartitionedTopic2));
+ }
+ if (!isPartitionedTopic1) {
+ return 0;
+ }
+ int partitions1 = pulsar1.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join().get().partitions;
+ int partitions2 = pulsar2.getPulsarResources().getNamespaceResources()
+
.getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join().get().partitions;
+ if (partitions1 != partitions2) {
+ throw new IllegalArgumentException(String.format("Can not delete
topic."
+ + " partitions1: %s, partitions2: %s",
+ partitions1, partitions2));
+ }
+ return partitions1;
+ }
+
+ protected void deleteTopicAfterDisableTopicLevelReplication(String topic)
throws Exception {
+ setTopicLevelClusters(topic, Arrays.asList(cluster1), admin1, pulsar1);
+ setTopicLevelClusters(topic, Arrays.asList(cluster1), admin2, pulsar2);
+ admin2.topics().setReplicationClusters(topic, Arrays.asList(cluster2));
+
+ int partitions = ensurePartitionsAreSame(topic);
+
+ TopicName topicName =
TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+ if (partitions != 0) {
+ admin1.topics().deletePartitionedTopic(topicName.toString());
+ admin2.topics().deletePartitionedTopic(topicName.toString());
+ } else {
+ admin1.topics().delete(topicName.toString());
+ admin2.topics().delete(topicName.toString());
+ }
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
new file mode 100644
index 00000000000..d827235bc32
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker")
+public class OneWayReplicatorUsingGlobalZKTest extends OneWayReplicatorTest {
+
+ @Override
+ @BeforeClass(alwaysRun = true, timeOut = 300000)
+ public void setup() throws Exception {
+ super.usingGlobalZK = true;
+ super.setup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true, timeOut = 300000)
+ public void cleanup() throws Exception {
+ super.cleanup();
+ }
+
+ @Test(enabled = false)
+ public void testReplicatorProducerStatInTopic() throws Exception {
+ super.testReplicatorProducerStatInTopic();
+ }
+
+ @Test(enabled = false)
+ public void testCreateRemoteConsumerFirst() throws Exception {
+ super.testReplicatorProducerStatInTopic();
+ }
+
+ @Test(enabled = false)
+ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws
Exception {
+ super.testReplicatorProducerStatInTopic();
+ }
+
+ @Test(enabled = false)
+ public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws
Exception {
+ super.testConcurrencyOfUnloadBundleAndRecreateProducer();
+ }
+
+ @Test(enabled = false)
+ public void testPartitionedTopicLevelReplication() throws Exception {
+ super.testPartitionedTopicLevelReplication();
+ }
+
+ @Test(enabled = false)
+ public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws
Exception {
+ super.testPartitionedTopicLevelReplicationRemoteTopicExist();
+ }
+
+ @Test(enabled = false)
+ public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist()
throws Exception {
+ super.testPartitionedTopicLevelReplicationRemoteConflictTopicExist();
+ }
+
+ @Test(enabled = false)
+ public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws
Exception {
+ super.testConcurrencyOfUnloadBundleAndRecreateProducer2();
+ }
+
+ @Test(enabled = false)
+ public void testUnFenceTopicToReuse() throws Exception {
+ super.testUnFenceTopicToReuse();
+ }
+
+ @Test
+ public void testDeleteNonPartitionedTopic() throws Exception {
+ super.testDeleteNonPartitionedTopic();
+ }
+
+ @Test
+ public void testDeletePartitionedTopic() throws Exception {
+ super.testDeletePartitionedTopic();
+ }
+}