This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9f8b4a6d158 [fix][broker] Fix ack hole in cursor for geo-replication
(#20931)
9f8b4a6d158 is described below
commit 9f8b4a6d15899c7255c525299e1d6011a09b940e
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Oct 10 19:03:56 2024 +0900
[fix][broker] Fix ack hole in cursor for geo-replication (#20931)
Co-authored-by: Masahiro Sakamoto <[email protected]>
---
.../service/persistent/PersistentReplicator.java | 25 +++++++++++++++++++---
1 file changed, 22 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index b3d7546beed..bcb1f759540 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -116,6 +116,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
protected final ReplicatorStatsImpl stats = new ReplicatorStatsImpl();
protected volatile boolean fetchSchemaInProgress = false;
+ private volatile boolean waitForCursorRewinding = false;
public PersistentReplicator(String localCluster, PersistentTopic
localTopic, ManagedCursor cursor,
String remoteCluster, String remoteTopic,
@@ -143,9 +144,15 @@ public abstract class PersistentReplicator extends
AbstractReplicator
@Override
protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer)
{
- // Rewind the cursor to be sure to read again all non-acked messages
sent while restarting.
- cursor.rewind();
- cursor.cancelPendingReadRequest();
+ waitForCursorRewinding = true;
+
+ // Repeat until there are no read operations in progress
+ if (STATE_UPDATER.get(this) == State.Starting &&
HAVE_PENDING_READ_UPDATER.get(this) == TRUE
+ && !cursor.cancelPendingReadRequest()) {
+ brokerService.getPulsar().getExecutor()
+ .schedule(() ->
setProducerAndTriggerReadEntries(producer), 10, TimeUnit.MILLISECONDS);
+ return;
+ }
/**
* 1. Try change state to {@link Started}.
@@ -158,6 +165,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
if (!(producer instanceof ProducerImpl)) {
log.error("[{}] The partitions count between two clusters is
not the same, the replicator can not be"
+ " created successfully: {}", replicatorId, state);
+ waitForCursorRewinding = false;
doCloseProducerAsync(producer, () -> {});
throw new ClassCastException(producer.getClass().getName() + "
can not be cast to ProducerImpl");
}
@@ -168,6 +176,11 @@ public abstract class PersistentReplicator extends
AbstractReplicator
backOff.reset();
// activate cursor: so, entries can be cached.
this.cursor.setActive();
+
+ // Rewind the cursor to be sure to read again all non-acked
messages sent while restarting
+ cursor.rewind();
+ waitForCursorRewinding = false;
+
// read entries
readMoreEntries();
} else {
@@ -183,6 +196,7 @@ public abstract class PersistentReplicator extends
AbstractReplicator
log.error("[{}] Replicator state is not expected, so close the
producer. Replicator state: {}",
replicatorId, changeStateRes.getRight());
}
+ waitForCursorRewinding = false;
// Close the producer if change the state fail.
doCloseProducerAsync(producer, () -> {});
}
@@ -296,6 +310,11 @@ public abstract class PersistentReplicator extends
AbstractReplicator
// Schedule read
if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) {
+ if (waitForCursorRewinding) {
+ log.info("[{}] Skip the reading because repl producer is
starting", replicatorId);
+ HAVE_PENDING_READ_UPDATER.set(this, FALSE);
+ return;
+ }
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages or {} bytes",
replicatorId, messagesToRead,
bytesToRead);