lhotari commented on code in PR #20032:
URL: https://github.com/apache/pulsar/pull/20032#discussion_r1830693798
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoc
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
+ // broker side epoch is smaller than consumer epoch, so don't need to
handle this redeliver request
+ if (consumerEpoch < consumer.getConsumerEpoch()) {
+ log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since
broker epoch [{}] is smaller than "
+ + "consumer epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
+ return;
+ }
- if (consumerEpoch > consumer.getConsumerEpoch()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch
[{}]",
- name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
- }
- consumer.setConsumerEpoch(consumerEpoch);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
}
+ consumer.setConsumerEpoch(consumerEpoch);
Review Comment:
I get it now. It seems that the code was just refactored in this PR and the
logic wasn't really changed in this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]