codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r787353171



##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -643,6 +670,7 @@ String getHandlerName() {
     public void redeliverUnacknowledgedMessages() {
         lock.writeLock().lock();
         try {
+            CONSUMER_EPOCH.incrementAndGet(this);

Review comment:
       The `CONSUMER_EPOCH` of `MultiTopicsConsumerImpl` might have a different 
sequence with `ConsumerImpl` due to the epoch will increase if `ConsumerImpl` 
reconnects to the topic, here we assume the `MultiTopicsConsumerImpl` and 
`ConsumerImpl` with same the value.
   
   We can check the message epoch with the epoch of internal consumer?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1054,22 @@ private ExecutorService getInternalExecutor(Message<T> 
msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean isValidConsumerEpoch(MessageImpl<T> message) {

Review comment:
       Or we should return false if the epoch is invalid. I prefer to use 
`isValidConsumerEpoch` and return false if the epoch is invalid.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver 
unack message will increase the epoch

Review comment:
       It not for the redelivery, it's for the consumer reconnection?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -328,13 +327,26 @@ private void resumeReceivingFromPausedConsumersIfNeeded() 
{
         }
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    private boolean 
checkTopicMessageConsumerEpochIsSmallerThanConsumer(Message<T> message) {

Review comment:
       ```suggestion
       private boolean isValidConsumerEpoch(Message<T> message) {
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1054,22 @@ private ExecutorService getInternalExecutor(Message<T> 
msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean isValidConsumerEpoch(MessageImpl<T> message) {

Review comment:
       If true means the message is invalid, we should use 
`isInvalidConsumerEpoch`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to