This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9994614  [Broker] Synchronize updates to the inactiveProducers map in 
MessageDeduplication (#12820)
9994614 is described below

commit 9994614173205abd075fcc670396cebd71227047
Author: Michael Marshall <michael.marsh...@datastax.com>
AuthorDate: Mon Nov 15 15:02:36 2021 -0600

    [Broker] Synchronize updates to the inactiveProducers map in 
MessageDeduplication (#12820)
---
 .../apache/pulsar/broker/service/persistent/MessageDeduplication.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
index b2c42b0..e2436bb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java
@@ -138,7 +138,7 @@ public class MessageDeduplication {
     private CompletableFuture<Void> recoverSequenceIdsMap() {
         // Load the sequence ids from the snapshot in the cursor properties
         managedCursor.getProperties().forEach((k, v) -> {
-            inactiveProducers.put(k, System.currentTimeMillis());
+            producerRemoved(k);
             highestSequencedPushed.put(k, v);
             highestSequencedPersisted.put(k, v);
         });
@@ -169,7 +169,7 @@ public class MessageDeduplication {
                     long sequenceId = Math.max(md.getHighestSequenceId(), 
md.getSequenceId());
                     highestSequencedPushed.put(producerName, sequenceId);
                     highestSequencedPersisted.put(producerName, sequenceId);
-                    inactiveProducers.put(producerName, 
System.currentTimeMillis());
+                    producerRemoved(producerName);
 
                     entry.release();
                 }

Reply via email to