This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9994614 [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820) 9994614 is described below commit 9994614173205abd075fcc670396cebd71227047 Author: Michael Marshall <michael.marsh...@datastax.com> AuthorDate: Mon Nov 15 15:02:36 2021 -0600 [Broker] Synchronize updates to the inactiveProducers map in MessageDeduplication (#12820) --- .../apache/pulsar/broker/service/persistent/MessageDeduplication.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index b2c42b0..e2436bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -138,7 +138,7 @@ public class MessageDeduplication { private CompletableFuture<Void> recoverSequenceIdsMap() { // Load the sequence ids from the snapshot in the cursor properties managedCursor.getProperties().forEach((k, v) -> { - inactiveProducers.put(k, System.currentTimeMillis()); + producerRemoved(k); highestSequencedPushed.put(k, v); highestSequencedPersisted.put(k, v); }); @@ -169,7 +169,7 @@ public class MessageDeduplication { long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId()); highestSequencedPushed.put(producerName, sequenceId); highestSequencedPersisted.put(producerName, sequenceId); - inactiveProducers.put(producerName, System.currentTimeMillis()); + producerRemoved(producerName); entry.release(); }