codelipenghui commented on a change in pull request #10478:
URL: https://github.com/apache/pulsar/pull/10478#discussion_r780077192



##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -546,15 +547,15 @@ public static ByteBuf newSubscribe(String topic, String 
subscription, long consu
         return newSubscribe(topic, subscription, consumerId, requestId, 
subType, priorityLevel, consumerName,
                 isDurable, startMessageId, metadata, readCompacted, 
isReplicated, subscriptionInitialPosition,
                 startMessageRollbackDurationInSec, schemaInfo, 
createTopicIfDoesNotExist, null,
-                Collections.emptyMap());
+                Collections.emptyMap(), -1L);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
                SubType subType, int priorityLevel, String consumerName, 
boolean isDurable, MessageIdData startMessageId,
                Map<String, String> metadata, boolean readCompacted, boolean 
isReplicated,
                InitialPosition subscriptionInitialPosition, long 
startMessageRollbackDurationInSec,
                SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, 
KeySharedPolicy keySharedPolicy,
-               Map<String, String> subscriptionProperties) {
+               Map<String, String> subscriptionProperties, long epoch) {

Review comment:
       ```suggestion
                  Map<String, String> subscriptionProperties, long 
consumerEpoch) {
   ```

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -526,6 +529,7 @@ message CommandMessage {
     required MessageIdData message_id = 2;
     optional uint32 redelivery_count  = 3 [default = 0];
     repeated int64 ack_set = 4;
+    optional uint64 consumer_epoch = 5 [default = 0];

Review comment:
       I think we don't need the default value?
   
   If the `consumer_epoch` does not present, it means the broker does not 
assign a `consumer_epoch` for the message or the message from the old version 
broker without the consumer epoch feature. 

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
##########
@@ -94,9 +96,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
             } else {
                 cursorPosition = (PositionImpl) cursor.getReadPosition();
             }
+
+            // TODO: redeliver epoch

Review comment:
       Could you please create an issue for tracking the task?

##########
File path: 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
##########
@@ -937,10 +937,13 @@ public void testPooledMessageWithAckTimeout(boolean 
isBatchingEnabled) throws Ex
         retryStrategically((test) -> consumer.incomingMessages.peek() != null, 
5, 500);
         MessageImpl<ByteBuffer> msg = (MessageImpl) 
consumer.incomingMessages.peek();
         assertNotNull(msg);
-        ByteBuf payload = ((MessageImpl) msg).getPayload();
+        ByteBuf payload = msg.getPayload();
         assertNotEquals(payload.refCnt(), 0);
         consumer.redeliverUnacknowledgedMessages();
-        assertEquals(payload.refCnt(), 0);
+        consumer.clearIncomingMessagesAndGetMessageNumber();
+        if (payload.refCnt() != 0) {

Review comment:
       We have released the payload in 
`clearIncomingMessagesAndGetMessageNumber`, why need this check here? Or any 
cases the payload ref count is not 0 after released the message?

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -616,6 +620,7 @@ message CommandCloseConsumer {
 message CommandRedeliverUnacknowledgedMessages {
     required uint64 consumer_id = 1;
     repeated MessageIdData message_ids = 2;
+    optional uint64 consumer_epoch = 3 [default = 0];

Review comment:
       We don't need the default value?

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver 
unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       ```suggestion
       optional uint64 consumer_epoch = 19 [default = 0];
   ```

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void 
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
             if (redeliver.getMessageIdsCount() > 0 && 
Subscription.isIndividualAckMode(consumer.subType())) {
                 
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
+                consumer.setConsumerEpoch(redeliver.getConsumerEpoch());

Review comment:
       We should avoid situations where the epoch can be reduced

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // move this because we should make the sub in this channel use one 
consumer future and do such as redeliver op

Review comment:
       I think the comment here is not only for this change, we will see the 
comment in the codebase in the future. I think the correct description is
   
   ```
   Make sure the consumer future is put into the consumers map first to avoid 
the same consumer ID using different consumer futures, and only remove the 
consumer future from the map if subscribe failed .
   ```
   ```

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -412,13 +411,20 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         try {
             message = incomingMessages.take();
             messageProcessed(message);
+            if (checkMessageImplConsumerEpochIsSmallerThanConsumer(message)) {

Review comment:
       ```suggestion
               if (!isValidConsumerEpoch(message)) {
   ```
   
   More straightforward and understandable

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number 
included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     int maxUnackedMessages, TransportCnx cnx, String appId,
                     Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition,
-                    KeySharedMeta keySharedMeta, MessageId startMessageId) {
+                    KeySharedMeta keySharedMeta, MessageId startMessageId, 
long consumerEpoch) {

Review comment:
       I noticed there are lots of changes from the tests are related to the 
newly added param, it's better to keep the old constructor to avoid the many 
changes for this PR.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
##########
@@ -127,11 +130,15 @@
     private final String clientAddress; // IP address only, no port number 
included
     private final MessageId startMessageId;
 
+    @Getter
+    @Setter
+    private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH;

Review comment:
       The constructor already init the `consumerEpoch`.

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                 subscriptionName,
                 TopicOperation.CONSUME
         );
+
+        // move this because we should make the sub in this channel use one 
consumer future and do such as redeliver op
+        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
+        CompletableFuture<Consumer> existingConsumerFuture = 
consumers.putIfAbsent(consumerId,
+                consumerFuture);

Review comment:
       How about:
   
   ```java
   CompletableFuture<Consumer> existingConsumerFuture = 
consumers.putIfAbsent(consumerId,
                   new CompletableFuture<>());
   ```
   
   And, `computeIfAbsent` is more elegant here.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> 
msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean 
checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover

Review comment:
       We should print a warn log here, which will help with troubleshooting.

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> 
msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean 
checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {
+        if ((getSubType() == CommandSubscribe.SubType.Failover
+                || getSubType() == CommandSubscribe.SubType.Exclusive)
+                && (message).getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH

Review comment:
       why need `(message)` here? any difference with 
`message.getConsumerEpoch`?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> 
msg) {
         return executor;
     }
 
+    // If message consumer epoch is smaller than consumer epoch present that
+    // it has been sent to the client before the user calls 
redeliverUnacknowledgedMessages, this message is invalid.
+    // so we should release this message and receive again
+    protected boolean 
checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) {

Review comment:
       For more safety, we should not filter out any message for a broker with 
an old version protocol version.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver 
unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       we don't need the default value?

##########
File path: 
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
##########
@@ -1492,6 +1499,7 @@ protected void 
handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa
             if (redeliver.getMessageIdsCount() > 0 && 
Subscription.isIndividualAckMode(consumer.subType())) {
                 
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
+                consumer.setConsumerEpoch(redeliver.getConsumerEpoch());

Review comment:
       For the non-persistent topic and shared subscription, we have disabled 
the consumer epoch, this also means we should prevent the modification for the 
consumer epoch through the message redelivery.

##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -387,6 +387,9 @@ message CommandSubscribe {
     optional KeySharedMeta keySharedMeta = 17;
 
     repeated KeyValue subscription_properties = 18;
+
+    // The consumer epoch, when exclusive and failover consumer redeliver 
unack message will increase the epoch
+    optional uint64 epoch = 19 [default = 0];

Review comment:
       I see the DEFAULT_CONSUMER_EPOCH is -1, but here is 0. I think to make 
them consistent or don't assign the default value in the protocol is simpler 
for us?

##########
File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -86,6 +90,13 @@
     protected final Lock reentrantLock = new ReentrantLock();
     private volatile boolean isListenerHandlingMessage = false;
 
+    @Getter
+    protected final AtomicLong consumerEpoch = new AtomicLong(0);

Review comment:
       AtomicUpdater is more efficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to