This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2bba620 Fix deduplication cursor does not delete after disabling message deduplication (#7656) 2bba620 is described below commit 2bba620fb47afa6a4dceed36e332059792266313 Author: lipenghui <peng...@apache.org> AuthorDate: Thu Jul 30 09:06:26 2020 +0800 Fix deduplication cursor does not delete after disabling message deduplication (#7656) ### Motivation Fix deduplication cursor does not delete after disabling message deduplication. The issue occurs when enabling the message deduplication at the broker.conf and then disable it and restart the broker. The dedup cursor will not be deleted. --- .../bookkeeper/mledger/ManagedLedgerException.java | 6 +++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +- .../service/persistent/MessageDeduplication.java | 44 +++++++++++++++++++--- .../pulsar/broker/service/ServerCnxTest.java | 3 -- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 6e6e6a2..0b2cd4e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -149,6 +149,12 @@ public class ManagedLedgerException extends Exception { } } + public static class CursorNotFoundException extends ManagedLedgerException { + public CursorNotFoundException(String msg) { + super(msg); + } + } + @Override public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 810b3f8..eaf06fc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -774,7 +774,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final Object ctx) { final ManagedCursorImpl cursor = (ManagedCursorImpl) cursors.get(consumerName); if (cursor == null) { - callback.deleteCursorFailed(new ManagedLedgerException("ManagedCursor not found: " + consumerName), ctx); + callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: " + + consumerName), ctx); return; } else if (!cursor.isDurable()) { cursors.removeCursor(consumerName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 5c91d52..2fb4944 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -60,6 +60,10 @@ public class MessageDeduplication { private ManagedCursor managedCursor; enum Status { + + // Deduplication is initialized + Initialized, + // Deduplication is disabled Disabled, @@ -122,7 +126,7 @@ public class MessageDeduplication { this.pulsar = pulsar; this.topic = topic; this.managedLedger = managedLedger; - this.status = Status.Disabled; + this.status = Status.Initialized; this.snapshotInterval = pulsar.getConfiguration().getBrokerDeduplicationEntriesInterval(); this.maxNumberOfProducers = pulsar.getConfiguration().getBrokerDeduplicationMaxNumberOfProducers(); this.snapshotCounter = 0; @@ -200,6 +204,25 @@ public class MessageDeduplication { pulsar.getExecutor().schedule(this::checkStatus, 1, TimeUnit.MINUTES); return CompletableFuture.completedFuture(null); } + if (status == Status.Initialized && !shouldBeEnabled) { + status = Status.Removing; + managedLedger.asyncDeleteCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new DeleteCursorCallback() { + @Override + public void deleteCursorComplete(Object ctx) { + status = Status.Disabled; + log.info("[{}] Deleted deduplication cursor", topic.getName()); + } + + @Override + public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { + if (exception instanceof ManagedLedgerException.CursorNotFoundException) { + status = Status.Disabled; + } else { + log.error("[{}] Deleted deduplication cursor error", topic.getName(), exception); + } + } + }, null); + } if (status == Status.Enabled && !shouldBeEnabled) { // Disabled deduping @@ -220,15 +243,24 @@ public class MessageDeduplication { @Override public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { - log.warn("[{}] Failed to disable deduplication: {}", topic.getName(), - exception.getMessage()); - status = Status.Failed; - future.completeExceptionally(exception); + // It's ok for disable message deduplication. + if (exception instanceof ManagedLedgerException.CursorNotFoundException) { + status = Status.Disabled; + managedCursor = null; + highestSequencedPushed.clear(); + highestSequencedPersisted.clear(); + future.complete(null); + } else { + log.warn("[{}] Failed to disable deduplication: {}", topic.getName(), + exception.getMessage()); + status = Status.Failed; + future.completeExceptionally(exception); + } } }, null); return future; - } else if (status == Status.Disabled && shouldBeEnabled) { + } else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) { // Enable deduping CompletableFuture<Void> future = new CompletableFuture<>(); managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index dd1176a..94a02f6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -22,7 +22,6 @@ import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMo import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.matches; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; @@ -78,7 +77,6 @@ import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.ServerCnx.State; -import org.apache.pulsar.broker.service.persistent.MessageDeduplication; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService; import org.apache.pulsar.broker.service.utils.ClientChannelHelper; @@ -143,7 +141,6 @@ public class ServerCnxTest { private ManagedLedger ledgerMock = mock(ManagedLedger.class); private ManagedCursor cursorMock = mock(ManagedCursor.class); - private OrderedExecutor executor; @BeforeMethod