codelipenghui commented on a change in pull request #10478: URL: https://github.com/apache/pulsar/pull/10478#discussion_r787353171
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java ########## @@ -643,6 +670,7 @@ String getHandlerName() { public void redeliverUnacknowledgedMessages() { lock.writeLock().lock(); try { + CONSUMER_EPOCH.incrementAndGet(this); Review comment: The `CONSUMER_EPOCH` of `MultiTopicsConsumerImpl` might have a different sequence with `ConsumerImpl` due to the epoch will increase if `ConsumerImpl` reconnects to the topic, here we assume the `MultiTopicsConsumerImpl` and `ConsumerImpl` with same the value. We can check the message epoch with the epoch of internal consumer? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -1041,5 +1054,22 @@ private ExecutorService getInternalExecutor(Message<T> msg) { return executor; } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + protected boolean isValidConsumerEpoch(MessageImpl<T> message) { Review comment: Or we should return false if the epoch is invalid. I prefer to use `isValidConsumerEpoch` and return false if the epoch is invalid. ########## File path: pulsar-common/src/main/proto/PulsarApi.proto ########## @@ -387,6 +387,9 @@ message CommandSubscribe { optional KeySharedMeta keySharedMeta = 17; repeated KeyValue subscription_properties = 18; + + // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch Review comment: It not for the redelivery, it's for the consumer reconnection? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java ########## @@ -328,13 +327,26 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { } } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + private boolean checkTopicMessageConsumerEpochIsSmallerThanConsumer(Message<T> message) { Review comment: ```suggestion private boolean isValidConsumerEpoch(Message<T> message) { ``` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -1041,5 +1054,22 @@ private ExecutorService getInternalExecutor(Message<T> msg) { return executor; } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + protected boolean isValidConsumerEpoch(MessageImpl<T> message) { Review comment: If true means the message is invalid, we should use `isInvalidConsumerEpoch` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org