congbobo184 commented on code in PR #20032:
URL: https://github.com/apache/pulsar/pull/20032#discussion_r1830668569


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoc
     }
 
     private synchronized void internalRedeliverUnacknowledgedMessages(Consumer 
consumer, long consumerEpoch) {
+        // broker side epoch is smaller than consumer epoch, so don't need to 
handle this redeliver request
+        if (consumerEpoch < consumer.getConsumerEpoch()) {
+            log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since 
broker epoch [{}] is smaller than "
+                            + "consumer epoch [{}]",
+                    name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
+            return;
+        }
 
-        if (consumerEpoch > consumer.getConsumerEpoch()) {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch 
[{}]",
-                        name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
-            }
-            consumer.setConsumerEpoch(consumerEpoch);
+        if (log.isDebugEnabled()) {
+            log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
+                    name, consumer, consumer.getConsumerEpoch(), 
consumerEpoch);
         }
 
+        consumer.setConsumerEpoch(consumerEpoch);

Review Comment:
   this means that client consumer invoke redeliverUnacknowledgedMessages, 
client consumer hope that broker can next send messages with consumerEpoch.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2103,8 +2103,10 @@ public void redeliverUnacknowledgedMessages() {
                 incomingQueueLock.unlock();
             }
 
-            // is channel is connected, we should send redeliver command to 
broker
-            if (cnx != null && isConnected(cnx)) {
+            // If a subscription command has been sent to the broker, it is 
necessary to allow the redelivery
+            // request to be sent to the broker without checking the 
connection state, as failing to do so would
+            // result in the client consumer epoch being bigger than the 
broker consumer epoch.

Review Comment:
   - client consumer send reconnect command to broker  epoch = 1
   - client consumer invoke redeliverUnacknowledgedMessages epoch = 2
   - if check connect, client consumer will not send 
redeliverUnacknowledgedMessages command with epoch = 2 to broker
   - client consumer epoch = 2 will bigger than broker consumer epoch = 2, 
client will filter all messages with epoch = 1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to