lhotari commented on code in PR #20032:
URL: https://github.com/apache/pulsar/pull/20032#discussion_r1798874588
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoc
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
+ // broker side epoch is smaller than consumer epoch, so don't need to
handle this redeliver request
+ if (consumerEpoch < consumer.getConsumerEpoch()) {
+ log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since
broker epoch [{}] is smaller than "
+ + "consumer epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
+ return;
+ }
- if (consumerEpoch > consumer.getConsumerEpoch()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch
[{}]",
- name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
- }
- consumer.setConsumerEpoch(consumerEpoch);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
}
+ consumer.setConsumerEpoch(consumerEpoch);
Review Comment:
This doesn't make sense to me. Why is the broker side epoch updated here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]