poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564796734


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java:
##########
@@ -96,83 +123,138 @@ public AbstractReplicator(String localCluster, Topic 
localTopic, String remoteCl
                 .sendTimeout(0, TimeUnit.SECONDS) //
                 .maxPendingMessages(producerQueueSize) //
                 .producerName(getProducerName());
-        STATE_UPDATER.set(this, State.Stopped);
+        STATE_UPDATER.set(this, State.Disconnected);
     }
 
     protected abstract String getProducerName();
 
-    protected abstract void 
readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);
+    protected abstract void 
setProducerAndTriggerReadEntries(org.apache.pulsar.client.api.Producer<byte[]> 
producer);
 
     protected abstract Position getReplicatorReadPosition();
 
-    protected abstract long getNumberOfEntriesInBacklog();
+    public abstract long getNumberOfEntriesInBacklog();
 
     protected abstract void disableReplicatorRead();
 
     public String getRemoteCluster() {
         return remoteCluster;
     }
 
-    // This method needs to be synchronized with disconnects else if there is 
a disconnect followed by startProducer
-    // the end result can be disconnect.
-    public synchronized void startProducer() {
-        if (STATE_UPDATER.get(this) == State.Stopping) {
-            long waitTimeMs = backOff.next();
-            if (log.isDebugEnabled()) {
-                log.debug(
-                        "[{}] waiting for producer to close before attempting 
to reconnect, retrying in {} s",
-                        replicatorId, waitTimeMs / 1000.0);
-            }
-            // BackOff before retrying
-            
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                    TimeUnit.MILLISECONDS);
-            return;
-        }
-        State state = STATE_UPDATER.get(this);
-        if (!STATE_UPDATER.compareAndSet(this, State.Stopped, State.Starting)) 
{
-            if (state == State.Started) {
-                // Already running
+    public void startProducer() {
+        // Guarantee only one task call "producerBuilder.createAsync()".
+        Pair<Boolean, State> setStartingRes = 
compareSetAndGetState(State.Disconnected, State.Starting);
+        if (!setStartingRes.getLeft()) {
+            if (setStartingRes.getRight() == State.Starting) {
+                log.info("[{}] Skip the producer creation since other thread 
is doing starting, state : {}",
+                        replicatorId, state);
+            } else if (setStartingRes.getRight() == State.Started) {
+                // Since the method "startProducer" will be called even if it 
is started, only print debug-level log.
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Replicator was already running. state: 
{}", replicatorId, state);
+                }
+            } else if (setStartingRes.getRight() == State.Disconnecting) {
                 if (log.isDebugEnabled()) {
-                    log.debug("[{}] Replicator was already running", 
replicatorId);
+                    log.debug("[{}] Rep.producer is closing, delay to 
retry(wait the producer close success)."
+                            + " state: {}", replicatorId, state);
                 }
+                delayStartProducerAfterDisconnected();
             } else {
-                log.info("[{}] Replicator already being started. Replicator 
state: {}", replicatorId, state);
+                /** {@link State.Terminating}, {@link State.Terminated}. **/
+                log.info("[{}] Skip the producer creation since the replicator 
state is : {}", replicatorId, state);
             }
-
             return;
         }
 
         log.info("[{}] Starting replicator", replicatorId);
         producerBuilder.createAsync().thenAccept(producer -> {
-            readEntries(producer);
+            setProducerAndTriggerReadEntries(producer);
         }).exceptionally(ex -> {
-            if (STATE_UPDATER.compareAndSet(this, State.Starting, 
State.Stopped)) {
+            Pair<Boolean, State> setDisconnectedRes = 
compareSetAndGetState(State.Starting, State.Disconnected);
+            if (setDisconnectedRes.getLeft()) {
                 long waitTimeMs = backOff.next();
                 log.warn("[{}] Failed to create remote producer ({}), retrying 
in {} s",
                         replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
-
                 // BackOff before retrying
-                
brokerService.executor().schedule(this::checkTopicActiveAndRetryStartProducer, 
waitTimeMs,
-                        TimeUnit.MILLISECONDS);
+                scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
             } else {
-                log.warn("[{}] Failed to create remote producer. Replicator 
state: {}", replicatorId,
-                        STATE_UPDATER.get(this), ex);
+                if (setDisconnectedRes.getRight() == State.Terminating
+                        || setDisconnectedRes.getRight() == State.Terminated) {
+                    log.info("[{}] Skip to create producer, because it has 
been terminated, state is : {}",
+                            replicatorId, state);
+                } else {
+                    /** {@link  State.Disconnected}, {@link  State.Starting}, 
{@link  State.Started} **/
+                    // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                    // So print a warn log.
+                    log.warn("[{}] Other thread will try to create the 
producer again. so skipped current one task."
+                                    + " State is : {}",
+                            replicatorId, state);
+                }
             }
             return null;
         });
+    }
 
+    /***
+     * The producer is disconnecting, delay to start the producer.
+     * If we start a producer immediately, we will get a conflict 
producer(same name producer) registered error.
+     */
+    protected void delayStartProducerAfterDisconnected() {
+        long waitTimeMs = backOff.next();
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "[{}] waiting for producer to close before attempting to 
reconnect, retrying in {} s",
+                    replicatorId, waitTimeMs / 1000.0);
+        }
+        scheduleCheckTopicActiveAndStartProducer(waitTimeMs);
     }
 
-    protected void checkTopicActiveAndRetryStartProducer() {
-        isLocalTopicActive().thenAccept(isTopicActive -> {
-            if (isTopicActive) {
-                startProducer();
+    protected void scheduleCheckTopicActiveAndStartProducer(final long 
waitTimeMs) {
+        brokerService.executor().schedule(() -> {
+            if (state == State.Terminating || state == State.Terminated) {
+                log.info("[{}] Skip scheduled to start the producer since the 
replicator state is : {}",
+                        replicatorId, state);
+                return;
             }
-        }).exceptionally(ex -> {
-            log.warn("[{}] Stop retry to create producer due to topic load 
fail. Replicator state: {}", replicatorId,
-                    STATE_UPDATER.get(this), ex);
-            return null;
-        });
+            CompletableFuture<Optional<Topic>> topicFuture = 
brokerService.getTopics().get(localTopicName);
+            if (topicFuture == null) {
+                // Topic closed.
+                log.info("[{}] Skip scheduled to start the producer since the 
topic was closed successfully."
+                        + " And trigger a terminate.", replicatorId);
+                terminate();
+                return;
+            }
+            topicFuture.thenAccept(optional -> {
+                if (optional.isEmpty()) {
+                    // Topic closed.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                if (optional.get() != localTopic) {
+                    // Topic closed and created a new one, current replicator 
is outdated.
+                    log.info("[{}] Skip scheduled to start the producer since 
the topic was closed. And trigger a"
+                            + " terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                Replicator replicator = 
localTopic.getReplicators().get(remoteCluster);
+                if (replicator != AbstractReplicator.this) {
+                    // Current replicator has been closed, and created a new 
one.
+                    log.info("[{}] Skip scheduled to start the producer since 
a new replicator has instead current"
+                            + " one. And trigger a terminate.", replicatorId);
+                    terminate();
+                    return;
+                }
+                startProducer();
+            }).exceptionally(ex -> {
+                log.warn("[{}] [{}] Stop retry to create producer due to 
unknown error(topic create failed), and"

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to