shibd commented on code in PR #15206:
URL: https://github.com/apache/pulsar/pull/15206#discussion_r853255991


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java:
##########
@@ -456,19 +457,23 @@ public synchronized void purgeInactiveProducers() {
                 
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
 
         Iterator<java.util.Map.Entry<String, Long>> mapIterator = 
inactiveProducers.entrySet().iterator();
+        boolean hasInactive = false;
         while (mapIterator.hasNext()) {
             java.util.Map.Entry<String, Long> entry = mapIterator.next();
             String producerName = entry.getKey();
             long lastActiveTimestamp = entry.getValue();
 
-            mapIterator.remove();
-
             if (lastActiveTimestamp < minimumActiveTimestamp) {
                 log.info("[{}] Purging dedup information for producer {}", 
topic.getName(), producerName);
+                mapIterator.remove();
                 highestSequencedPushed.remove(producerName);
                 highestSequencedPersisted.remove(producerName);
+                hasInactive = true;
             }
         }
+        if (hasInactive) {
+            takeSnapshot(getManagedCursor().getMarkDeletedPosition());

Review Comment:
   Thanks your reply, the main reason I triggered a snapshot in the 
`purgeInactiveProducers` method was. If we rely on 
`deduplication-snapshot-monitor` timer to take snapshot, that may not be 
executed. Because we did the optimizations in the `takeSnapshot()` method.
   ```java
           PositionImpl markDeletedPosition = (PositionImpl) 
managedCursor.getMarkDeletedPosition();
           if (markDeletedPosition != null && 
position.compareTo(markDeletedPosition) <= 0) {
               return;
           }
   ```
   
   When `dup.cursor.markDeletedPosition == ledger.lastConfirmedEntry` and all 
producers stoped. In this case, we will not be able to clean up these 
producers. Unless new data is written and take a snapshot. The can be confusing 
for users.
   
   > but it traverses all topics each time when it is executed. If many topics 
have Producers that need to be purged, a large number of snapshots will be 
generated.
   
   In effect, automatically `takeSnapshot` also traverses all topics.
   
   > we do not need to generate a snapshot after each topic is purged. Or, wait 
until all topics are cleaned up and create a snapshot.
   
   This PR is only take a snapshot when there are inactive topics
   
   In conclusion, the snapshot here is to solve the problem described above, 
`checkMessageDeduplicationInfo` is not a frequent operation.
   
   Thanks again, If you have any suggestions, please continue the discussion.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to