This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bba620  Fix deduplication cursor does not delete after disabling 
message deduplication (#7656)
2bba620 is described below

commit 2bba620fb47afa6a4dceed36e332059792266313
Author: lipenghui <peng...@apache.org>
AuthorDate: Thu Jul 30 09:06:26 2020 +0800

    Fix deduplication cursor does not delete after disabling message 
deduplication (#7656)
    
    ### Motivation
    
    Fix deduplication cursor does not delete after disabling message 
deduplication. The issue occurs when enabling the message deduplication at the 
broker.conf and then disable it and restart the broker. The dedup cursor will 
not be deleted.
---
 .../bookkeeper/mledger/ManagedLedgerException.java |  6 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  3 +-
 .../service/persistent/MessageDeduplication.java   | 44 +++++++++++++++++++---
 .../pulsar/broker/service/ServerCnxTest.java       |  3 --
 4 files changed, 46 insertions(+), 10 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 6e6e6a2..0b2cd4e 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -149,6 +149,12 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class CursorNotFoundException extends ManagedLedgerException 
{
+        public CursorNotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 810b3f8..eaf06fc 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -774,7 +774,8 @@ public class ManagedLedgerImpl implements ManagedLedger, 
CreateCallback {
             final Object ctx) {
         final ManagedCursorImpl cursor = (ManagedCursorImpl) 
cursors.get(consumerName);
         if (cursor == null) {
-            callback.deleteCursorFailed(new 
ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx);
+            callback.deleteCursorFailed(new 
ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
+                    + consumerName), ctx);
             return;
         } else if (!cursor.isDurable()) {
             cursors.removeCursor(consumerName);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index 5c91d52..2fb4944 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -60,6 +60,10 @@ public class MessageDeduplication {
     private ManagedCursor managedCursor;
 
     enum Status {
+
+        // Deduplication is initialized
+        Initialized,
+
         // Deduplication is disabled
         Disabled,
 
@@ -122,7 +126,7 @@ public class MessageDeduplication {
         this.pulsar = pulsar;
         this.topic = topic;
         this.managedLedger = managedLedger;
-        this.status = Status.Disabled;
+        this.status = Status.Initialized;
         this.snapshotInterval = 
pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval();
         this.maxNumberOfProducers = 
pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers();
         this.snapshotCounter = 0;
@@ -200,6 +204,25 @@ public class MessageDeduplication {
                     pulsar.getExecutor().schedule(this::checkStatus, 1, 
TimeUnit.MINUTES);
                     return CompletableFuture.completedFuture(null);
                 }
+                if (status == Status.Initialized && !shouldBeEnabled) {
+                    status = Status.Removing;
+                    
managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new 
DeleteCursorCallback() {
+                        @Override
+                        public void deleteCursorComplete(Object ctx) {
+                            status = Status.Disabled;
+                            log.info("[{}] Deleted deduplication cursor", 
topic.getName());
+                        }
+
+                        @Override
+                        public void deleteCursorFailed(ManagedLedgerException 
exception, Object ctx) {
+                            if (exception instanceof 
ManagedLedgerException.CursorNotFoundException) {
+                                status = Status.Disabled;
+                            } else {
+                                log.error("[{}] Deleted deduplication cursor 
error", topic.getName(), exception);
+                            }
+                        }
+                    }, null);
+                }
 
                 if (status == Status.Enabled && !shouldBeEnabled) {
                     // Disabled deduping
@@ -220,15 +243,24 @@ public class MessageDeduplication {
 
                                 @Override
                                 public void 
deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
-                                    log.warn("[{}] Failed to disable 
deduplication: {}", topic.getName(),
-                                            exception.getMessage());
-                                    status = Status.Failed;
-                                    future.completeExceptionally(exception);
+                                    // It's ok for disable message 
deduplication.
+                                    if (exception instanceof 
ManagedLedgerException.CursorNotFoundException) {
+                                        status = Status.Disabled;
+                                        managedCursor = null;
+                                        highestSequencedPushed.clear();
+                                        highestSequencedPersisted.clear();
+                                        future.complete(null);
+                                    } else {
+                                        log.warn("[{}] Failed to disable 
deduplication: {}", topic.getName(),
+                                                exception.getMessage());
+                                        status = Status.Failed;
+                                        
future.completeExceptionally(exception);
+                                    }
                                 }
                             }, null);
 
                     return future;
-                } else if (status == Status.Disabled && shouldBeEnabled) {
+                } else if ((status == Status.Disabled || status == 
Status.Initialized) && shouldBeEnabled) {
                     // Enable deduping
                     CompletableFuture<Void> future = new CompletableFuture<>();
                     
managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new 
OpenCursorCallback() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index dd1176a..94a02f6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -22,7 +22,6 @@ import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo
 import static 
org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.matches;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -78,7 +77,6 @@ import 
org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.ServerCnx.State;
-import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
@@ -143,7 +141,6 @@ public class ServerCnxTest {
 
     private ManagedLedger ledgerMock = mock(ManagedLedger.class);
     private ManagedCursor cursorMock = mock(ManagedCursor.class);
-
     private OrderedExecutor executor;
 
     @BeforeMethod

Reply via email to