gaoran10 commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1562306732


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -188,58 +270,135 @@ protected CompletableFuture<Boolean> 
isLocalTopicActive() {
         }, brokerService.executor());
     }
 
-    protected synchronized CompletableFuture<Void> closeProducerAsync() {
-        if (producer == null) {
-            STATE_UPDATER.set(this, State.Stopped);
+    /**
+     * This method only be used by {@link PersistentTopic#checkGC} now.
+     */
+    public CompletableFuture<Void> disconnect(boolean failIfHasBacklog, 
boolean closeTheStartingProducer) {
+        long backlog = getNumberOfEntriesInBacklog();
+        if (failIfHasBacklog && backlog > 0) {
+            CompletableFuture<Void> disconnectFuture = new 
CompletableFuture<>();
+            disconnectFuture.completeExceptionally(new 
TopicBusyException("Cannot close a replicator with backlog"));
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Replicator disconnect failed since topic has 
backlog", replicatorId);
+            }
+            return disconnectFuture;
+        }
+        log.info("[{}] Disconnect replicator at position {} with backlog {}", 
replicatorId,
+                getReplicatorReadPosition(), backlog);
+        return closeProducerAsync(closeTheStartingProducer);
+    }
+
+    /**
+     * This method only be used by {@link PersistentTopic#checkGC} now.
+     */
+    protected CompletableFuture<Void> closeProducerAsync(boolean 
closeTheStartingProducer) {
+        Pair<Boolean, State> setDisconnectingRes = 
compareSetAndGetState(State.Started, State.Disconnecting);
+        if (!setDisconnectingRes.getLeft()) {
+            if (setDisconnectingRes.getRight() == State.Starting) {
+                if (closeTheStartingProducer) {
+                    /**
+                     * Delay retry(wait for the start producer task is finish).
+                     * Note: If the producer always start fail, the start 
producer task will always retry until the
+                     *   state changed to {@link State.Terminated}.
+                     *   Nit: The better solution is creating a {@link 
CompletableFuture} to trace the in-progress
+                     *     creation and call 
"inProgressCreationFuture.thenApply(closeProducer())".
+                     */
+                    long waitTimeMs = backOff.next();
+                    brokerService.executor().schedule(() -> 
closeProducerAsync(true),
+                            waitTimeMs, TimeUnit.MILLISECONDS);
+                } else {
+                    log.info("[{}] Skip current producer closing since the 
previous producer has been closed,"
+                                    + " and trying start a new one, state : 
{}",
+                            replicatorId, setDisconnectingRes.getRight());
+                }
+            } else if (setDisconnectingRes.getRight() == State.Disconnected
+                    || setDisconnectingRes.getRight() == State.Disconnecting) {
+                log.info("[{}] Skip current producer closing since other 
thread did closing, state : {}",
+                        replicatorId, setDisconnectingRes.getRight());
+            } else if (setDisconnectingRes.getRight() == State.Terminating
+                    || setDisconnectingRes.getRight() == State.Terminated) {
+                log.info("[{}] Skip current producer closing since other 
thread is doing termination, state : {}",
+                        replicatorId, state);
+            }
+            log.info("[{}] Skip current termination since other thread is 
doing close producer or termination,"
+                            + " state : {}", replicatorId, state);
             return CompletableFuture.completedFuture(null);
         }
-        CompletableFuture<Void> future = producer.closeAsync();
+
+        // Close producer and update state.
+        return doCloseProducerAsync(producer, () -> {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Disconnecting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
+                this.producer = null;
+                // deactivate further read
+                disableReplicatorRead();
+                return;
+            }
+            if (setDisconnectedRes.getRight() == State.Terminating
+                    || setDisconnectingRes.getRight() == State.Terminated) {
+                log.info("[{}] Skip setting state to terminated because it was 
terminated, state : {}",
+                        replicatorId, state);
+            } else {
+                // Since only one task can call 
"doCloseProducerAsync(producer, action)", this scenario is not expected.
+                // So print a warn log.
+                log.warn("[{}] Other task has change the state to terminated. 
so skipped current one task."
+                                + " State is : {}",
+                        replicatorId, state);
+            }
+        });
+    }
+
+    protected CompletableFuture<Void> doCloseProducerAsync(Producer<byte[]> 
producer, Runnable actionAfterClosed) {
+        CompletableFuture<Void> future =
+                producer == null ? CompletableFuture.completedFuture(null) : 
producer.closeAsync();
         return future.thenRun(() -> {
-            STATE_UPDATER.set(this, State.Stopped);
-            this.producer = null;
-            // deactivate further read
-            disableReplicatorRead();
+            actionAfterClosed.run();
         }).exceptionally(ex -> {
             long waitTimeMs = backOff.next();
             log.warn(
-                    "[{}] Exception: '{}' occurred while trying to close the 
producer."
-                            + " retrying again in {} s",
-                    replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
+                    "[{}] Exception: '{}' occurred while trying to close the 
producer. Replicator state: {}."
+                            + " Retrying again in {} s.",
+                    replicatorId, ex.getMessage(), state, waitTimeMs / 1000.0);
             // BackOff before retrying
-            brokerService.executor().schedule(this::closeProducerAsync, 
waitTimeMs, TimeUnit.MILLISECONDS);
+            brokerService.executor().schedule(() -> 
doCloseProducerAsync(producer, actionAfterClosed),
+                    waitTimeMs, TimeUnit.MILLISECONDS);
             return null;
         });
     }
 
+    public CompletableFuture<Void> terminate() {
+        if (!tryChangeStatusToTerminating()) {

Review Comment:
   The method `tryChangeStatusToTerminating` checks four states, `Starting`, 
`Started`, `Disconnecting`, `Disconnected`, maybe we can only check the states 
`Terminating` and `Terminated` here.
   
   Another potential problem is that if the state is `Terminating`, the method 
`terminate` will return a complete future object, but the terminate operation 
may not finish.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
                 .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getProducerName());
-        STATE_UPDATER.set(this, State.Stopped);
+        STATE_UPDATER.set(this, State.Disconnected);
     }
 
     protected abstract String getProducerName();
 
-    protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
+    protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> 
producer);
 
     protected abstract Position getReplicatorReadPosition();
 
-    protected abstract long getNumberOfEntriesInBacklog();
+    public abstract long getNumberOfEntriesInBacklog();
 
     protected abstract void disableReplicatorRead();
 
     public String getRemoteCluster() {
         return remoteCluster;
     }
 
-    // This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-    // the end result can be disconnect.
-    public synchronized void startProducer() {
-        if (STATE_UPDATER.get(this) == State.Stopping) {
-            long waitTimeMs = backOff.next();
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-                        replicatorId, waitTimeMs / 1000.0);
-            }
-            // BackOff before retrying
-            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                    TimeUnit.MILLISECONDS);
-            return;
-        }
-        State state = STATE_UPDATER.get(this);
-        if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-            if (state == State.Started) {
-                // Already running
+    public void startProducer() {
+        // Guarantee only one task call "producerBuilder.createAsync()".
+        Pair<Boolean, State> setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
+        if (!setStartingRes.getLeft()) {
+            if (setStartingRes.getRight() == State.Starting) {
+                log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+                        replicatorId, state);
+            } else if (setStartingRes.getRight() == State.Started) {
+                // Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+                }
+            } else if (setStartingRes.getRight() == State.Disconnecting) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Replicator was already running", 
replicatorId);
+                    log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
+                            + " state: {}", replicatorId, state);
                 }
+                delayStartProducerAfterDisconnected();
             } else {
-                log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+                /** {@link State.Terminating}, {@link State.Terminated}. **/
+                log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
             }
-
             return;
         }
 
         log.info("[{}] Starting replicator", replicatorId);
         producerBuilder.createAsync().thenAccept(producer -> {
-            readEntries(producer);
+            setProducerAndTriggerReadEntries(producer);
         }).exceptionally(ex -> {
-            if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
                 long waitTimeMs = backOff.next();
                 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
                         replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
-
                 // BackOff before retrying
-                
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                        TimeUnit.MILLISECONDS);
+                scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
             } else {
-                log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
-                        STATE_UPDATER.get(this), ex);
+                if (setDisconnectedRes.getRight() == State.Terminating
+                        || setDisconnectedRes.getRight() == State.Terminated) {
+                    log.info("[{}] Skip to create producer, because it has 
been terminated, state is : {}",
+                            replicatorId, state);
+                } else {
+                    /** {@link  State.Disconnected}, {@link  State.Starting}, 
{@link  State.Started} **/
+                    // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                    // So print a warn log.
+                    log.warn("[{}] Other thread will try to create the 
producer again. so skipped current one task."
+                                    + " State is : {}",
+                            replicatorId, state);
+                }
             }
             return null;
         });
+    }
 
+    /***
+     * The producer is disconnecting, delay to start the producer.
+     * If we start a producer immediately, we will get a conflict 
producer(same name producer) registered error.
+     */
+    protected void delayStartProducerAfterDisconnected() {
+        long waitTimeMs = backOff.next();
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "[{}] waiting for producer to close before attempting to 
reconnect, retrying in {} s",
+                    replicatorId, waitTimeMs / 1000.0);
+        }
+        scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
     }
 
-    protected void checkTopicActiveAndRetryStartProducer() {
-        isLocalTopicActive().thenAccept(isTopicActive -> {
-            if (isTopicActive) {
-                startProducer();
+    protected void scheduleCheckTopicActiveAndStartProducer(final long 
waitTimeMs) {
+        brokerService.executor().schedule(() -> {
+            if (state == State.Terminating || state == State.Terminated) {
+                log.info("[{}] Skip scheduled to start the producer since the 
replicator state is : {}",
+                        replicatorId, state);
+                return;
             }
-        }).exceptionally(ex -> {
-            log.warn("[{}] Stop retry to create producer due to topic load 
fail. Replicator state: {}", replicatorId,
-                    STATE_UPDATER.get(this), ex);
-            return null;
-        });
+            CompletableFuture<Optional<Topic>> topicFuture = 
brokerService.getTopics().get(localTopicName);
+            if (topicFuture == null) {
+                // Topic closed.
+                log.info("[{}] Skip scheduled to start the producer since the 
topic was closed successfully."
+                        + " And trigger a terminate.", replicatorId);
+                terminate();
+                return;
+            }
+            topicFuture.thenAccept(optional -> {
+                if (optional.isEmpty()) {
+                    // Topic closed.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                if (optional.get() != localTopic) {
+                    // Topic closed and created a new one, current replicator 
is outdated.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                Replicator replicator = 
localTopic.getReplicators().get(remoteCluster);
+                if (replicator != AbstractReplicator.this) {
+                    // Current replicator has been closed, and created a new 
one.
+                    log.info("[{}] Skip scheduled to start the producer since 
a new replicator has instead current"
+                            + " one. And trigger a terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                startProducer();
+            }).exceptionally(ex -> {
+                log.warn("[{}] [{}] Stop retry to create producer due to 
unknown error(topic create failed), and"

Review Comment:
   Maybe this log should be error level.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
     }
 
     @Override
-    protected void readEntries(Producer<byte[]> producer) {
-        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+    protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
+        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
         cursor.rewind();
-
         cursor.cancelPendingReadRequest();
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        this.producer = (ProducerImpl) producer;
 
-        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
-            log.info("[{}] Created replicator producer", replicatorId);
+        /**
+         * 1. Try change state to {@link Started}.
+         * 2. Atoms modify multiple properties if change state success, to 
avoid another thread get a null value
+         *    producer when the state is {@link Started}.
+         */
+        Pair<Boolean, State> changeStateRes;
+        changeStateRes = compareSetAndGetState(Starting, Started);
+        if (changeStateRes.getLeft()) {
+            this.producer = (ProducerImpl) producer;
+            HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+            // Trigger a new read.
+            log.info("[{}] Created replicator producer, Replicator state: {}", 
replicatorId, state);
             backOff.reset();
-            // activate cursor: so, entries can be cached
+            // activate cursor: so, entries can be cached.
             this.cursor.setActive();
             // read entries
             readMoreEntries();
         } else {
-            log.info(
-                    "[{}] Replicator was stopped while creating the producer."
-                            + " Closing it. Replicator state: {}",
-                    replicatorId, STATE_UPDATER.get(this));
-            STATE_UPDATER.set(this, State.Stopping);
-            closeProducerAsync();
+            if (changeStateRes.getRight() == Started) {
+                // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                // So print a warn log.
+                log.warn("[{}] Replicator was already started by another 
thread while creating the producer."
+                        + " Closing the producer newly created. Replicator 
state: {}", replicatorId, state);
+            } else if (changeStateRes.getRight() == Terminating || 
changeStateRes.getRight() == Terminated) {
+                log.info("[{}] Replicator was terminated, so close the 
producer. Replicator state: {}",

Review Comment:
   This is a little confusing, if the replicator was terminating or terminated, 
do we need to close the producer again?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
                 .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getProducerName());
-        STATE_UPDATER.set(this, State.Stopped);
+        STATE_UPDATER.set(this, State.Disconnected);
     }
 
     protected abstract String getProducerName();
 
-    protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
+    protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> 
producer);
 
     protected abstract Position getReplicatorReadPosition();
 
-    protected abstract long getNumberOfEntriesInBacklog();
+    public abstract long getNumberOfEntriesInBacklog();
 
     protected abstract void disableReplicatorRead();
 
     public String getRemoteCluster() {
         return remoteCluster;
     }
 
-    // This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-    // the end result can be disconnect.
-    public synchronized void startProducer() {
-        if (STATE_UPDATER.get(this) == State.Stopping) {
-            long waitTimeMs = backOff.next();
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-                        replicatorId, waitTimeMs / 1000.0);
-            }
-            // BackOff before retrying
-            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                    TimeUnit.MILLISECONDS);
-            return;
-        }
-        State state = STATE_UPDATER.get(this);
-        if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-            if (state == State.Started) {
-                // Already running
+    public void startProducer() {
+        // Guarantee only one task call "producerBuilder.createAsync()".
+        Pair<Boolean, State> setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
+        if (!setStartingRes.getLeft()) {
+            if (setStartingRes.getRight() == State.Starting) {
+                log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+                        replicatorId, state);
+            } else if (setStartingRes.getRight() == State.Started) {
+                // Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+                }
+            } else if (setStartingRes.getRight() == State.Disconnecting) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Replicator was already running", 
replicatorId);
+                    log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
+                            + " state: {}", replicatorId, state);
                 }
+                delayStartProducerAfterDisconnected();
             } else {
-                log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+                /** {@link State.Terminating}, {@link State.Terminated}. **/
+                log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
             }
-
             return;
         }
 
         log.info("[{}] Starting replicator", replicatorId);
         producerBuilder.createAsync().thenAccept(producer -> {
-            readEntries(producer);
+            setProducerAndTriggerReadEntries(producer);
         }).exceptionally(ex -> {
-            if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
                 long waitTimeMs = backOff.next();
                 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
                         replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
-
                 // BackOff before retrying
-                
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                        TimeUnit.MILLISECONDS);
+                scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
             } else {
-                log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
-                        STATE_UPDATER.get(this), ex);
+                if (setDisconnectedRes.getRight() == State.Terminating
+                        || setDisconnectedRes.getRight() == State.Terminated) {
+                    log.info("[{}] Skip to create producer, because it has 
been terminated, state is : {}",
+                            replicatorId, state);
+                } else {
+                    /** {@link  State.Disconnected}, {@link  State.Starting}, 
{@link  State.Started} **/
+                    // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                    // So print a warn log.
+                    log.warn("[{}] Other thread will try to create the 
producer again. so skipped current one task."
+                                    + " State is : {}",
+                            replicatorId, state);
+                }
             }
             return null;
         });
+    }
 
+    /***
+     * The producer is disconnecting, delay to start the producer.
+     * If we start a producer immediately, we will get a conflict 
producer(same name producer) registered error.
+     */
+    protected void delayStartProducerAfterDisconnected() {
+        long waitTimeMs = backOff.next();
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "[{}] waiting for producer to close before attempting to 
reconnect, retrying in {} s",
+                    replicatorId, waitTimeMs / 1000.0);
+        }
+        scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
     }
 
-    protected void checkTopicActiveAndRetryStartProducer() {
-        isLocalTopicActive().thenAccept(isTopicActive -> {
-            if (isTopicActive) {
-                startProducer();
+    protected void scheduleCheckTopicActiveAndStartProducer(final long 
waitTimeMs) {
+        brokerService.executor().schedule(() -> {
+            if (state == State.Terminating || state == State.Terminated) {
+                log.info("[{}] Skip scheduled to start the producer since the 
replicator state is : {}",
+                        replicatorId, state);
+                return;
             }
-        }).exceptionally(ex -> {
-            log.warn("[{}] Stop retry to create producer due to topic load 
fail. Replicator state: {}", replicatorId,
-                    STATE_UPDATER.get(this), ex);
-            return null;
-        });
+            CompletableFuture<Optional<Topic>> topicFuture = 
brokerService.getTopics().get(localTopicName);
+            if (topicFuture == null) {
+                // Topic closed.
+                log.info("[{}] Skip scheduled to start the producer since the 
topic was closed successfully."
+                        + " And trigger a terminate.", replicatorId);
+                terminate();
+                return;
+            }
+            topicFuture.thenAccept(optional -> {
+                if (optional.isEmpty()) {
+                    // Topic closed.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                if (optional.get() != localTopic) {
+                    // Topic closed and created a new one, current replicator 
is outdated.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }

Review Comment:
   Could we merge these two cases?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
     }
 
     @Override
-    protected void readEntries(Producer<byte[]> producer) {
-        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+    protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
+        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
         cursor.rewind();
-
         cursor.cancelPendingReadRequest();
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        this.producer = (ProducerImpl) producer;

Review Comment:
   Why do we need to move these two lines?
   It seems that if we don't set producer, the operation 
`doCloseProducerAsync(producer, () -> {}); ` can't close the producer.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -675,30 +692,6 @@ protected void checkReplicatedSubscriptionMarker(Position 
position, MessageImpl<
         }
     }
 
-    @Override
-    public CompletableFuture<Void> disconnect() {
-        return disconnect(false);
-    }
-
-    @Override
-    public synchronized CompletableFuture<Void> disconnect(boolean 
failIfHasBacklog) {
-        final CompletableFuture<Void> future = new CompletableFuture<>();
-
-        super.disconnect(failIfHasBacklog).thenRun(() -> {
-            dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

Review Comment:
   Don't forget to release the rate limiter resource.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3870,6 +3879,27 @@ private void unfenceTopicToResume() {
         subscriptions.values().forEach(sub -> sub.resumeAfterFence());
         isFenced = false;
         isClosingOrDeleting = false;
+        replicatorsResume();
+    }
+
+    private void replicatorsResume() {
+        removeTerminatingReplicators(replicators);
+        removeTerminatingReplicators(shadowReplicators);
+        checkReplication();
+        checkShadowReplication();
+    }
+
+    private static void 
removeTerminatingReplicators(ConcurrentOpenHashMap<String, Replicator> 
replicators) {

Review Comment:
   Maybe this method doesn't need the `static` qualifier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to