315157973 commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r852965699
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
Iterator<java.util.Map.Entry<String, Long>> mapIterator =
inactiveProducers.entrySet().iterator();
+ boolean hasInactive = false;
while (mapIterator.hasNext()) {
java.util.Map.Entry<String, Long> entry = mapIterator.next();
String producerName = entry.getKey();
long lastActiveTimestamp = entry.getValue();
- mapIterator.remove();
-
if (lastActiveTimestamp < minimumActiveTimestamp) {
log.info("[{}] Purging dedup information for producer {}",
topic.getName(), producerName);
+ mapIterator.remove();
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
+ hasInactive = true;
}
}
+ if (hasInactive) {
+ takeSnapshot(getManagedCursor().getMarkDeletedPosition());
Review Comment:
`checkMessageDeduplicationInfo` is not a frequent operation, but it
traverses all topics each time when it is executed. If many topics have
Producers that need to be purged, a large number of snapshots will be generated.
Currently, `takeSnapshot` is automatically executed every 120 seconds.
Therefore, we do not need to generate a snapshot after each topic is purged.
Or, wait until all topics are cleaned up and create a snapshot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]