congbobo184 commented on code in PR #20032:
URL: https://github.com/apache/pulsar/pull/20032#discussion_r1830668569
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java:
##########
@@ -279,15 +279,21 @@ public void redeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoc
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer
consumer, long consumerEpoch) {
+ // broker side epoch is smaller than consumer epoch, so don't need to
handle this redeliver request
+ if (consumerEpoch < consumer.getConsumerEpoch()) {
+ log.warn("[{}-{}] Ignoring redeliverUnacknowledgedMessages since
broker epoch [{}] is smaller than "
+ + "consumer epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
+ return;
+ }
- if (consumerEpoch > consumer.getConsumerEpoch()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch
[{}]",
- name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
- }
- consumer.setConsumerEpoch(consumerEpoch);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}-{}] Update epoch, old epoch [{}] new epoch [{}]",
+ name, consumer, consumer.getConsumerEpoch(),
consumerEpoch);
}
+ consumer.setConsumerEpoch(consumerEpoch);
Review Comment:
this means that client consumer invoke redeliverUnacknowledgedMessages,
client consumer hope that broker can next send messages with consumerEpoch.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -2103,8 +2103,10 @@ public void redeliverUnacknowledgedMessages() {
incomingQueueLock.unlock();
}
- // is channel is connected, we should send redeliver command to
broker
- if (cnx != null && isConnected(cnx)) {
+ // If a subscription command has been sent to the broker, it is
necessary to allow the redelivery
+ // request to be sent to the broker without checking the
connection state, as failing to do so would
+ // result in the client consumer epoch being bigger than the
broker consumer epoch.
Review Comment:
- client consumer send reconnect command to broker epoch = 1
- client consumer invoke redeliverUnacknowledgedMessages epoch = 2
- if check connect, client consumer will not send
redeliverUnacknowledgedMessages command with epoch = 2 to broker
- client consumer epoch = 2 will bigger than broker consumer epoch = 2,
client will filter all messages with epoch = 1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]