poorbarcode commented on code in PR #21946:
URL: https://github.com/apache/pulsar/pull/21946#discussion_r1564801185


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -134,30 +137,44 @@ public PersistentReplicator(String localCluster, 
PersistentTopic localTopic, Man
     }
 
     @Override
-    protected void readEntries(Producer<byte[]> producer) {
-        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting
+    protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
+        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
         cursor.rewind();
-
         cursor.cancelPendingReadRequest();
-        HAVE_PENDING_READ_UPDATER.set(this, FALSE);
-        this.producer = (ProducerImpl) producer;
 
-        if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
-            log.info("[{}] Created replicator producer", replicatorId);
+        /**
+         * 1. Try change state to {@link Started}.
+         * 2. Atoms modify multiple properties if change state success, to 
avoid another thread get a null value
+         *    producer when the state is {@link Started}.
+         */
+        Pair<Boolean, State> changeStateRes;
+        changeStateRes = compareSetAndGetState(Starting, Started);
+        if (changeStateRes.getLeft()) {
+            this.producer = (ProducerImpl) producer;
+            HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+            // Trigger a new read.
+            log.info("[{}] Created replicator producer, Replicator state: {}", 
replicatorId, state);
             backOff.reset();
-            // activate cursor: so, entries can be cached
+            // activate cursor: so, entries can be cached.
             this.cursor.setActive();
             // read entries
             readMoreEntries();
         } else {
-            log.info(
-                    "[{}] Replicator was stopped while creating the producer."
-                            + " Closing it. Replicator state: {}",
-                    replicatorId, STATE_UPDATER.get(this));
-            STATE_UPDATER.set(this, State.Stopping);
-            closeProducerAsync();
+            if (changeStateRes.getRight() == Started) {
+                // Since only one task can call 
"producerBuilder.createAsync()", this scenario is not expected.
+                // So print a warn log.
+                log.warn("[{}] Replicator was already started by another 
thread while creating the producer."
+                        + " Closing the producer newly created. Replicator 
state: {}", replicatorId, state);
+            } else if (changeStateRes.getRight() == Terminating || 
changeStateRes.getRight() == Terminated) {
+                log.info("[{}] Replicator was terminated, so close the 
producer. Replicator state: {}",

Review Comment:
   > This is a little confusing, if the replicator was terminating or 
terminated, do we need to close the producer again?
   
   Yes, it is needed. Because the variable `producer` here is not the variable 
`this.producer`, it is a new object, we need to close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to