hangc0276 commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r795007153



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5;

Review comment:
       The default epoch is -1L, but the `consumer_epoch` defined in pb is 
uint64, it will be the max value of uint64?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -442,12 +454,21 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
     @Override
     protected Message<T> internalReceive(int timeout, TimeUnit unit) throws 
PulsarClientException {
         Message<T> message;
+        long callTime = System.currentTimeMillis();

Review comment:
       We'd better use NANOSECONDS instead of MILLISECONDS

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1734,29 +1786,54 @@ public int numMessagesInQueue() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        ClientCnx cnx = cnx();
-        if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v2.getValue()) {
-            int currentSize = 0;
-            synchronized (this) {
-                currentSize = incomingMessages.size();
-                clearIncomingMessages();
-                unAckedMessageTracker.clear();
-            }
-            
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId),
 cnx.ctx().voidPromise());
-            if (currentSize > 0) {
-                increaseAvailablePermits(cnx, currentSize);
+        // First : synchronized in order to handle consumer reconnect produce 
race condition, when broker receive
+        // redeliverUnacknowledgedMessages and consumer have not be created and
+        // then receive reconnect epoch change the broker is smaller than the 
client epoch, this will cause client epoch
+        // smaller than broker epoch forever. client will not receive message 
anymore.
+        // Second : we should synchronized `ClientCnx cnx = cnx()` to
+        // prevent use old cnx to send redeliverUnacknowledgedMessages to a 
old broker
+        synchronized (ConsumerImpl.this) {
+            ClientCnx cnx = cnx();
+            // V1 don't support redeliverUnacknowledgedMessages
+            if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < 
ProtocolVersion.v2.getValue()) {
+                if ((getState() == State.Connecting)) {
+                    log.warn("[{}] Client Connection needs to be established "
+                            + "for redelivery of unacknowledged messages", 
this);
+                } else {
+                    log.warn("[{}] Reconnecting the client to redeliver the 
messages.", this);
+                    cnx.ctx().close();
+                }
+
+                return;
             }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] [{}] [{}] Redeliver unacked messages and send 
{} permits", subscription, topic,
-                        consumerName, currentSize);
+
+            // clear local message
+            int currentSize = 0;
+            currentSize = incomingMessages.size();
+            clearIncomingMessages();
+            unAckedMessageTracker.clear();
+
+            // we should increase epoch every time, because 
MultiTopicsConsumerImpl also increase it,
+            // we need to keep both epochs the same
+            if (conf.getSubscriptionType() == SubscriptionType.Failover
+                    || conf.getSubscriptionType() == 
SubscriptionType.Exclusive) {
+                CONSUMER_EPOCH.incrementAndGet(this);

Review comment:
       For MultiTopicsConsumer, it increased CONSUMER_EPOCH, and then call 
`redeliverUnacknowledgedMessages` for each consumer, will it lead epoch in 
consistent between consumers?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to