This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 0be2fb9b9cc [fix][broker] Release orphan replicator after topic closed
(#20582)
0be2fb9b9cc is described below
commit 0be2fb9b9cc7d37a8687296db407d3c63cc750f3
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Jun 16 10:39:28 2023 +0800
[fix][broker] Release orphan replicator after topic closed (#20582)
---
.../pulsar/broker/service/AbstractReplicator.java | 39 +++++-
.../nonpersistent/NonPersistentReplicator.java | 2 +-
.../service/persistent/PersistentReplicator.java | 2 +-
.../broker/service/AbstractReplicatorTest.java | 146 +++++++++++++++++++++
4 files changed, 183 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index 18e2e2d16c3..a142c804001 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -44,6 +45,7 @@ public abstract class AbstractReplicator {
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
+ protected final Topic localTopic;
protected volatile ProducerImpl producer;
public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
@@ -64,11 +66,12 @@ public abstract class AbstractReplicator {
Stopped, Starting, Started, Stopping
}
- public AbstractReplicator(String topicName, String replicatorPrefix,
String localCluster, String remoteCluster,
+ public AbstractReplicator(Topic localTopic, String replicatorPrefix,
String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl
replicationClient)
throws PulsarServerException {
this.brokerService = brokerService;
- this.topicName = topicName;
+ this.localTopic = localTopic;
+ this.topicName = localTopic.getName();
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteCluster = remoteCluster.intern();
@@ -111,7 +114,8 @@ public abstract class AbstractReplicator {
topicName, localCluster, remoteCluster, waitTimeMs /
1000.0);
}
// BackOff before retrying
- brokerService.executor().schedule(this::startProducer, waitTimeMs,
TimeUnit.MILLISECONDS);
+
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer,
+ waitTimeMs, TimeUnit.MILLISECONDS);
return;
}
State state = STATE_UPDATER.get(this);
@@ -139,7 +143,8 @@ public abstract class AbstractReplicator {
localCluster, remoteCluster, ex.getMessage(),
waitTimeMs / 1000.0);
// BackOff before retrying
- brokerService.executor().schedule(this::startProducer,
waitTimeMs, TimeUnit.MILLISECONDS);
+
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer,
+ waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer.
Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this),
ex);
@@ -149,6 +154,32 @@ public abstract class AbstractReplicator {
}
+ protected void checkTopicActiveAndRetryStartProducer() {
+ isLocalTopicActive().thenAccept(isTopicActive -> {
+ if (isTopicActive) {
+ startProducer();
+ }
+ }).exceptionally(ex -> {
+ log.warn("[{}] Stop retry to create producer due to topic load
fail. Replicator state: {}",
+ String.format("%s%s%s",
getReplicatorName(replicatorPrefix, localCluster),
+ REPL_PRODUCER_NAME_DELIMITER, remoteCluster),
STATE_UPDATER.get(this), ex);
+ return null;
+ });
+ }
+
+ protected CompletableFuture<Boolean> isLocalTopicActive() {
+ CompletableFuture<Optional<Topic>> topicFuture =
brokerService.getTopics().get(topicName);
+ if (topicFuture == null){
+ return CompletableFuture.completedFuture(false);
+ }
+ return topicFuture.thenApplyAsync(optional -> {
+ if (optional.isEmpty()) {
+ return false;
+ }
+ return optional.get() == localTopic;
+ }, brokerService.executor());
+ }
+
protected synchronized CompletableFuture<Void> closeProducerAsync() {
if (producer == null) {
STATE_UPDATER.set(this, State.Stopped);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index b863e9eb3c2..40d19e8176d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -50,7 +50,7 @@ public class NonPersistentReplicator extends
AbstractReplicator implements Repli
public NonPersistentReplicator(NonPersistentTopic topic, String
localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
- super(topic.getName(), topic.getReplicatorPrefix(), localCluster,
remoteCluster, brokerService,
+ super(topic, topic.getReplicatorPrefix(), localCluster, remoteCluster,
brokerService,
replicationClient);
producerBuilder.blockIfQueueFull(false);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 0471f12f3c9..75ea294329e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -110,7 +110,7 @@ public class PersistentReplicator extends AbstractReplicator
public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor,
String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl
replicationClient)
throws PulsarServerException {
- super(topic.getName(), topic.getReplicatorPrefix(), localCluster,
remoteCluster, brokerService,
+ super(topic, topic.getReplicatorPrefix(), localCluster, remoteCluster,
brokerService,
replicationClient);
this.topic = topic;
this.cursor = cursor;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
new file mode 100644
index 00000000000..30be3c55c59
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.util.internal.DefaultPriorityQueue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "broker")
+public class AbstractReplicatorTest {
+
+ @Test
+ public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
+ final String localCluster = "localCluster";
+ final String remoteCluster = "remoteCluster";
+ final String topicName = "remoteTopicName";
+ final String replicatorPrefix = "pulsar.repl";
+ final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
+ // Mock services.
+ final ServiceConfiguration pulsarConfig =
mock(ServiceConfiguration.class);
+ final PulsarService pulsar = mock(PulsarService.class);
+ final BrokerService broker = mock(BrokerService.class);
+ final Topic localTopic = mock(Topic.class);
+ final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+ final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+ final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+ final ConcurrentOpenHashMap<String,
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
+ when(broker.executor()).thenReturn(eventLoopGroup);
+ when(broker.getTopics()).thenReturn(topics);
+
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
+ when(broker.pulsar()).thenReturn(pulsar);
+ when(pulsar.getClient()).thenReturn(localClient);
+ when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
+ when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
+ when(localTopic.getName()).thenReturn(topicName);
+ when(producerBuilder.topic(any())).thenReturn(producerBuilder);
+
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
+
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+ when(producerBuilder.sendTimeout(anyInt(),
any())).thenReturn(producerBuilder);
+
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+ // Mock create producer fail.
+ when(producerBuilder.create()).thenThrow(new RuntimeException("mocked
ex"));
+ when(producerBuilder.createAsync())
+ .thenReturn(CompletableFuture.failedFuture(new
RuntimeException("mocked ex")));
+ // Make race condition: "retry start producer" and "close replicator".
+ final ReplicatorInTest replicator = new ReplicatorInTest(localTopic,
remoteCluster, topicName,
+ replicatorPrefix, broker, remoteClient);
+ replicator.startProducer();
+ replicator.disconnect();
+
+ // Verify task will done.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ AtomicInteger taskCounter = new AtomicInteger();
+ CountDownLatch checkTaskFinished = new CountDownLatch(1);
+ eventLoopGroup.execute(() -> {
+ synchronized (replicator) {
+ LinkedBlockingQueue taskQueue =
WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue");
+ DefaultPriorityQueue scheduledTaskQueue =
+ WhiteboxImpl.getInternalState(eventLoopGroup,
"scheduledTaskQueue");
+ taskCounter.set(taskQueue.size() +
scheduledTaskQueue.size());
+ checkTaskFinished.countDown();
+ }
+ });
+ checkTaskFinished.await();
+ Assert.assertEquals(taskCounter.get(), 0);
+ });
+ }
+
+ private static class ReplicatorInTest extends AbstractReplicator {
+
+ public ReplicatorInTest(Topic localTopic, String remoteCluster, String
remoteTopicName,
+ String replicatorPrefix, BrokerService
brokerService,
+ PulsarClientImpl replicationClient) throws
PulsarServerException {
+ super(localTopic, remoteCluster, remoteTopicName,
replicatorPrefix, brokerService,
+ replicationClient);
+ }
+
+ protected String getProducerName() {
+ return "pulsar.repl.producer";
+ }
+
+ @Override
+ protected void readEntries(Producer<byte[]> producer) {
+
+ }
+
+ @Override
+ protected Position getReplicatorReadPosition() {
+ return PositionImpl.EARLIEST;
+ }
+
+ @Override
+ protected long getNumberOfEntriesInBacklog() {
+ return 0;
+ }
+
+ @Override
+ protected void disableReplicatorRead() {
+
+ }
+ }
+}