shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r853255991
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
Iterator<java.util.Map.Entry<String, Long>> mapIterator =
inactiveProducers.entrySet().iterator();
+ boolean hasInactive = false;
while (mapIterator.hasNext()) {
java.util.Map.Entry<String, Long> entry = mapIterator.next();
String producerName = entry.getKey();
long lastActiveTimestamp = entry.getValue();
- mapIterator.remove();
-
if (lastActiveTimestamp < minimumActiveTimestamp) {
log.info("[{}] Purging dedup information for producer {}",
topic.getName(), producerName);
+ mapIterator.remove();
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
+ hasInactive = true;
}
}
+ if (hasInactive) {
+ takeSnapshot(getManagedCursor().getMarkDeletedPosition());
Review Comment:
Thanks your reply, the main reason I triggered a snapshot in the
`purgeInactiveProducers` method was. If we rely on
`deduplication-snapshot-monitor` timer to take snapshot, that may not be
executed. Because we did the optimizations in the `takeSnapshot()` method.
```
PositionImpl markDeletedPosition = (PositionImpl)
managedCursor.getMarkDeletedPosition();
if (markDeletedPosition != null &&
position.compareTo(markDeletedPosition) <= 0) {
return;
}
```
When `dup.cursor.markDeletedPosition == ledger.lastConfirmedEntry` and all
producers stoped. In this case, we will not be able to clean up these
producers. Unless new data is written and take a snapshot. The can be confusing
for users.
> but it traverses all topics each time when it is executed. If many topics
have Producers that need to be purged, a large number of snapshots will be
generated.
In effect, automatically `takeSnapshot` also traverses all topics.
> we do not need to generate a snapshot after each topic is purged. Or, wait
until all topics are cleaned up and create a snapshot.
This PR is only take a snapshot when there are inactive topics
In conclusion, the snapshot here is to solve the problem described above,
`checkMessageDeduplicationInfo` is not a frequent operation.
Thanks again, If you have any suggestions, please continue the discussion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]