lhotari commented on code in PR #20032:
URL: https://github.com/apache/pulsar/pull/20032#discussion_r1830693798


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoc
     }
 
     private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
+        // broker side epoch is smaller than consumer epoch, so don't need to 
handle this redeliver request
+        if (consumerEpoch < consumer.getConsumerEpoch()) {
+            log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since 
broker epoch [{}] is smaller than "
+                            + "consumer epoch [{}]",
+                    name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
+            return;
+        }
 
-        if (consumerEpoch > consumer.getConsumerEpoch()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch 
[{}]",
-                        name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
-            }
-            consumer.setConsumerEpoch(consumerEpoch);
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
+                    name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
         }
 
+        consumer.setConsumerEpoch(consumerEpoch);

Review Comment:
   I get it now. It seems that the code was just refactored in this PR and the 
logic wasn't really changed in this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to