This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 206ea82  Fix NPEs in PersistentReplicator (#9763)
206ea82 is described below

commit 206ea82e012af34c5bb7f824826205019f3dd4ff
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Mon Mar 1 22:00:38 2021 +0200

    Fix NPEs in PersistentReplicator (#9763)
---
 .../service/persistent/PersistentReplicator.java   | 22 +++++++++++++---------
 1 file changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2d1a60f..e1f6949 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -71,7 +71,7 @@ public class PersistentReplicator extends AbstractReplicator
     private final PersistentTopic topic;
     private final String replicatorName;
     private final ManagedLedger ledger;
-    protected ManagedCursor cursor;
+    protected volatile ManagedCursor cursor;
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
 
@@ -190,12 +190,14 @@ public class PersistentReplicator extends 
AbstractReplicator
 
     @Override
     protected void disableReplicatorRead() {
-        // deactivate cursor after successfully close the producer
-        this.cursor.setInactive();
+        if (this.cursor != null) {
+            // deactivate cursor after successfully close the producer
+            this.cursor.setInactive();
+        }
     }
 
     @Override
-    protected synchronized CompletableFuture<Void> openCursorAsync() {
+    protected CompletableFuture<Void> openCursorAsync() {
         log.info("[{}][{} -> {}] Starting open cursor for replicator", 
topicName, localCluster, remoteCluster);
         if (cursor != null) {
             log.info("[{}][{} -> {}] Using the exists cursor for replicator", 
topicName, localCluster, remoteCluster);
@@ -441,10 +443,12 @@ public class PersistentReplicator extends 
AbstractReplicator
     }
 
     public void updateCursorState() {
-        if (producer != null && producer.isConnected()) {
-            this.cursor.setActive();
-        } else {
-            this.cursor.setInactive();
+        if (this.cursor != null) {
+            if (producer != null && producer.isConnected()) {
+                this.cursor.setActive();
+            } else {
+                this.cursor.setInactive();
+            }
         }
     }
 
@@ -688,7 +692,7 @@ public class PersistentReplicator extends AbstractReplicator
     }
 
     public ReplicatorStats getStats() {
-        stats.replicationBacklog = cursor.getNumberOfEntriesInBacklog(false);
+        stats.replicationBacklog = cursor != null ? 
cursor.getNumberOfEntriesInBacklog(false) : 0;
         stats.connected = producer != null && producer.isConnected();
         stats.replicationDelayInSeconds = getReplicationDelayInSeconds();
 

Reply via email to