This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 083b158e906 [fix][broker] release orphan replicator after topic closed 
(#20567)
083b158e906 is described below

commit 083b158e90643a98d08352103bb1e568aabd5ead
Author: fengyubiao <[email protected]>
AuthorDate: Thu Jun 15 12:48:40 2023 +0800

    [fix][broker] release orphan replicator after topic closed (#20567)
    
    Motivation: When the `replicator.producer` retries to start<sup>[1]</sup> 
the topic close<sup>[2]</sup> executed concurrently, there is an orphan 
replicator after this topic is closed.
    Modifications: If the topic was already closed, stop to retry.
---
 .../pulsar/broker/service/AbstractReplicator.java  |  38 +++++-
 .../nonpersistent/NonPersistentReplicator.java     |   2 +-
 .../service/persistent/PersistentReplicator.java   |   2 +-
 .../broker/service/AbstractReplicatorTest.java     | 145 +++++++++++++++++++++
 4 files changed, 181 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
index deab89cda72..539f178d665 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -47,6 +48,7 @@ public abstract class AbstractReplicator {
     protected final PulsarClientImpl replicationClient;
     protected final PulsarClientImpl client;
     protected String replicatorId;
+    protected final Topic localTopic;
 
     protected volatile ProducerImpl producer;
     public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
@@ -67,11 +69,12 @@ public abstract class AbstractReplicator {
         Stopped, Starting, Started, Stopping
     }
 
-    public AbstractReplicator(String localCluster, String localTopicName, 
String remoteCluster, String remoteTopicName,
+    public AbstractReplicator(String localCluster, Topic localTopic, String 
remoteCluster, String remoteTopicName,
                               String replicatorPrefix, BrokerService 
brokerService, PulsarClientImpl replicationClient)
             throws PulsarServerException {
         this.brokerService = brokerService;
-        this.localTopicName = localTopicName;
+        this.localTopic = localTopic;
+        this.localTopicName = localTopic.getName();
         this.replicatorPrefix = replicatorPrefix;
         this.localCluster = localCluster.intern();
         this.remoteTopicName = remoteTopicName;
@@ -120,7 +123,8 @@ public abstract class AbstractReplicator {
                         replicatorId, waitTimeMs / 1000.0);
             }
             // BackOff before retrying
-            brokerService.executor().schedule(this::startProducer, waitTimeMs, 
TimeUnit.MILLISECONDS);
+            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
+                    TimeUnit.MILLISECONDS);
             return;
         }
         State state = STATE_UPDATER.get(this);
@@ -147,7 +151,8 @@ public abstract class AbstractReplicator {
                         replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
 
                 // BackOff before retrying
-                brokerService.executor().schedule(this::startProducer, 
waitTimeMs, TimeUnit.MILLISECONDS);
+                
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
+                        TimeUnit.MILLISECONDS);
             } else {
                 log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
                         STATE_UPDATER.get(this), ex);
@@ -157,6 +162,31 @@ public abstract class AbstractReplicator {
 
     }
 
+    protected void checkTopicActiveAndRetryStartProducer() {
+        isLocalTopicActive().thenAccept(isTopicActive -> {
+            if (isTopicActive) {
+                startProducer();
+            }
+        }).exceptionally(ex -> {
+            log.warn("[{}] Stop retry to create producer due to topic load 
fail. Replicator state: {}", replicatorId,
+                    STATE_UPDATER.get(this), ex);
+            return null;
+        });
+    }
+
+    protected CompletableFuture<Boolean> isLocalTopicActive() {
+        CompletableFuture<Optional<Topic>> topicFuture = 
brokerService.getTopics().get(localTopicName);
+        if (topicFuture == null){
+            return CompletableFuture.completedFuture(false);
+        }
+        return topicFuture.thenApplyAsync(optional -> {
+            if (optional.isEmpty()) {
+                return false;
+            }
+            return optional.get() == localTopic;
+        }, brokerService.executor());
+    }
+
     protected synchronized CompletableFuture<Void> closeProducerAsync() {
         if (producer == null) {
             STATE_UPDATER.set(this, State.Stopped);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
index 514db4219db..087c5f93200 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java
@@ -50,7 +50,7 @@ public class NonPersistentReplicator extends 
AbstractReplicator implements Repli
 
     public NonPersistentReplicator(NonPersistentTopic topic, String 
localCluster, String remoteCluster,
             BrokerService brokerService, PulsarClientImpl replicationClient) 
throws PulsarServerException {
-        super(localCluster, topic.getName(), remoteCluster, topic.getName(), 
topic.getReplicatorPrefix(), brokerService,
+        super(localCluster, topic, remoteCluster, topic.getName(), 
topic.getReplicatorPrefix(), brokerService,
                 replicationClient);
 
         producerBuilder.blockIfQueueFull(false);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index d882cbf56b2..ccf70eecec3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -112,7 +112,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
                                    String remoteCluster, String remoteTopic,
                                    BrokerService brokerService, 
PulsarClientImpl replicationClient)
             throws PulsarServerException {
-        super(localCluster, localTopic.getName(), remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
+        super(localCluster, localTopic, remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
                 brokerService, replicationClient);
         this.topic = localTopic;
         this.cursor = cursor;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
new file mode 100644
index 00000000000..294a9b341ec
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractReplicatorTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import io.netty.channel.DefaultEventLoop;
+import io.netty.util.internal.DefaultPriorityQueue;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AbstractReplicatorTest {
+
+    @Test
+    public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
+        final String localCluster = "localCluster";
+        final String remoteCluster = "remoteCluster";
+        final String topicName = "remoteTopicName";
+        final String replicatorPrefix = "pulsar.repl";
+        final DefaultEventLoop eventLoopGroup = new DefaultEventLoop();
+        // Mock services.
+        final ServiceConfiguration pulsarConfig = 
mock(ServiceConfiguration.class);
+        final PulsarService pulsar = mock(PulsarService.class);
+        final BrokerService broker = mock(BrokerService.class);
+        final Topic localTopic = mock(Topic.class);
+        final PulsarClientImpl localClient = mock(PulsarClientImpl.class);
+        final PulsarClientImpl remoteClient = mock(PulsarClientImpl.class);
+        final ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+        final ConcurrentOpenHashMap<String, 
CompletableFuture<Optional<Topic>>> topics = new ConcurrentOpenHashMap<>();
+        when(broker.executor()).thenReturn(eventLoopGroup);
+        when(broker.getTopics()).thenReturn(topics);
+        
when(remoteClient.newProducer(any(Schema.class))).thenReturn(producerBuilder);
+        when(broker.pulsar()).thenReturn(pulsar);
+        when(pulsar.getClient()).thenReturn(localClient);
+        when(pulsar.getConfiguration()).thenReturn(pulsarConfig);
+        when(pulsarConfig.getReplicationProducerQueueSize()).thenReturn(100);
+        when(localTopic.getName()).thenReturn(topicName);
+        when(producerBuilder.topic(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.messageRoutingMode(any())).thenReturn(producerBuilder);
+        
when(producerBuilder.enableBatching(anyBoolean())).thenReturn(producerBuilder);
+        when(producerBuilder.sendTimeout(anyInt(), 
any())).thenReturn(producerBuilder);
+        
when(producerBuilder.maxPendingMessages(anyInt())).thenReturn(producerBuilder);
+        
when(producerBuilder.producerName(anyString())).thenReturn(producerBuilder);
+        // Mock create producer fail.
+        when(producerBuilder.create()).thenThrow(new RuntimeException("mocked 
ex"));
+        when(producerBuilder.createAsync())
+                .thenReturn(CompletableFuture.failedFuture(new 
RuntimeException("mocked ex")));
+        // Make race condition: "retry start producer" and "close replicator".
+        final ReplicatorInTest replicator = new ReplicatorInTest(localCluster, 
localTopic, remoteCluster, topicName,
+                replicatorPrefix, broker, remoteClient);
+        replicator.startProducer();
+        replicator.disconnect();
+
+        // Verify task will done.
+        Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            AtomicInteger taskCounter = new AtomicInteger();
+            CountDownLatch checkTaskFinished = new CountDownLatch(1);
+            eventLoopGroup.execute(() -> {
+                synchronized (replicator) {
+                    LinkedBlockingQueue taskQueue = 
WhiteboxImpl.getInternalState(eventLoopGroup, "taskQueue");
+                    DefaultPriorityQueue scheduledTaskQueue =
+                            WhiteboxImpl.getInternalState(eventLoopGroup, 
"scheduledTaskQueue");
+                    taskCounter.set(taskQueue.size() + 
scheduledTaskQueue.size());
+                    checkTaskFinished.countDown();
+                }
+            });
+            checkTaskFinished.await();
+            Assert.assertEquals(taskCounter.get(), 0);
+        });
+    }
+
+    private static class ReplicatorInTest extends AbstractReplicator {
+
+        public ReplicatorInTest(String localCluster, Topic localTopic, String 
remoteCluster, String remoteTopicName,
+                                String replicatorPrefix, BrokerService 
brokerService,
+                                PulsarClientImpl replicationClient) throws 
PulsarServerException {
+            super(localCluster, localTopic, remoteCluster, remoteTopicName, 
replicatorPrefix, brokerService,
+                    replicationClient);
+        }
+
+        @Override
+        protected String getProducerName() {
+            return "pulsar.repl.producer";
+        }
+
+        @Override
+        protected void readEntries(Producer<byte[]> producer) {
+
+        }
+
+        @Override
+        protected Position getReplicatorReadPosition() {
+            return PositionImpl.EARLIEST;
+        }
+
+        @Override
+        protected long getNumberOfEntriesInBacklog() {
+            return 0;
+        }
+
+        @Override
+        protected void disableReplicatorRead() {
+
+        }
+    }
+}

Reply via email to