This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f8b4a6d158 [fix][broker] Fix ack hole in cursor for geo-replication 
(#20931)
9f8b4a6d158 is described below

commit 9f8b4a6d15899c7255c525299e1d6011a09b940e
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Oct 10 19:03:56 2024 +0900

    [fix][broker] Fix ack hole in cursor for geo-replication (#20931)
    
    Co-authored-by: Masahiro Sakamoto <[email protected]>
---
 .../service/persistent/PersistentReplicator.java   | 25 +++++++++++++++++++---
 1 file changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index b3d7546beed..bcb1f759540 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -116,6 +116,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
 
     protected volatile boolean fetchSchemaInProgress = false;
+    private volatile boolean waitForCursorRewinding = false;
 
     public PersistentReplicator(String localCluster, PersistentTopic 
localTopic, ManagedCursor cursor,
                                    String remoteCluster, String remoteTopic,
@@ -143,9 +144,15 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) 
{
-        // Rewind the cursor to be sure to read again all non-acked messages 
sent while restarting.
-        cursor.rewind();
-        cursor.cancelPendingReadRequest();
+        waitForCursorRewinding = true;
+
+        // Repeat until there are no read operations in progress
+        if (STATE_UPDATER.get(this) == State.Starting && 
HAVE_PENDING_READ_UPDATER.get(this) == TRUE
+                && !cursor.cancelPendingReadRequest()) {
+            brokerService.getPulsar().getExecutor()
+                    .schedule(() -> 
setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS);
+            return;
+        }
 
         /**
          * 1. Try change state to {@link Started}.
@@ -158,6 +165,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             if (!(producer instanceof ProducerImpl)) {
                 log.error("[{}] The partitions count between two clusters is 
not the same, the replicator can not be"
                         + " created successfully: {}", replicatorId, state);
+                waitForCursorRewinding = false;
                 doCloseProducerAsync(producer, () -> {});
                 throw new ClassCastException(producer.getClass().getName() + " 
can not be cast to ProducerImpl");
             }
@@ -168,6 +176,11 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
             backOff.reset();
             // activate cursor: so, entries can be cached.
             this.cursor.setActive();
+
+            // Rewind the cursor to be sure to read again all non-acked 
messages sent while restarting
+            cursor.rewind();
+            waitForCursorRewinding = false;
+
             // read entries
             readMoreEntries();
         } else {
@@ -183,6 +196,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
                 log.error("[{}] Replicator state is not expected, so close the 
producer. Replicator state: {}",
                         replicatorId, changeStateRes.getRight());
             }
+            waitForCursorRewinding = false;
             // Close the producer if change the state fail.
             doCloseProducerAsync(producer, () -> {});
         }
@@ -296,6 +310,11 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
             // Schedule read
             if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
+                if (waitForCursorRewinding) {
+                    log.info("[{}] Skip the reading because repl producer is 
starting", replicatorId);
+                    HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+                    return;
+                }
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Schedule read of {} messages or {} bytes", 
replicatorId, messagesToRead,
                             bytesToRead);

Reply via email to