codelipenghui commented on a change in pull request #10478: URL: https://github.com/apache/pulsar/pull/10478#discussion_r780077192
########## File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java ########## @@ -546,15 +547,15 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, - Collections.emptyMap()); + Collections.emptyMap(), -1L); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, Map<String, String> metadata, boolean readCompacted, boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, - Map<String, String> subscriptionProperties) { + Map<String, String> subscriptionProperties, long epoch) { Review comment: ```suggestion Map<String, String> subscriptionProperties, long consumerEpoch) { ``` ########## File path: pulsar-common/src/main/proto/PulsarApi.proto ########## @@ -526,6 +529,7 @@ message CommandMessage { required MessageIdData message_id = 2; optional uint32 redelivery_count = 3 [default = 0]; repeated int64 ack_set = 4; + optional uint64 consumer_epoch = 5 [default = 0]; Review comment: I think we don't need the default value? If the `consumer_epoch` does not present, it means the broker does not assign a `consumer_epoch` for the message or the message from the old version broker without the consumer epoch feature. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java ########## @@ -94,9 +96,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, } else { cursorPosition = (PositionImpl) cursor.getReadPosition(); } + + // TODO: redeliver epoch Review comment: Could you please create an issue for tracking the task? ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java ########## @@ -937,10 +937,13 @@ public void testPooledMessageWithAckTimeout(boolean isBatchingEnabled) throws Ex retryStrategically((test) -> consumer.incomingMessages.peek() != null, 5, 500); MessageImpl<ByteBuffer> msg = (MessageImpl) consumer.incomingMessages.peek(); assertNotNull(msg); - ByteBuf payload = ((MessageImpl) msg).getPayload(); + ByteBuf payload = msg.getPayload(); assertNotEquals(payload.refCnt(), 0); consumer.redeliverUnacknowledgedMessages(); - assertEquals(payload.refCnt(), 0); + consumer.clearIncomingMessagesAndGetMessageNumber(); + if (payload.refCnt() != 0) { Review comment: We have released the payload in `clearIncomingMessagesAndGetMessageNumber`, why need this check here? Or any cases the payload ref count is not 0 after released the message? ########## File path: pulsar-common/src/main/proto/PulsarApi.proto ########## @@ -616,6 +620,7 @@ message CommandCloseConsumer { message CommandRedeliverUnacknowledgedMessages { required uint64 consumer_id = 1; repeated MessageIdData message_ids = 2; + optional uint64 consumer_epoch = 3 [default = 0]; Review comment: We don't need the default value? ########## File path: pulsar-common/src/main/proto/PulsarApi.proto ########## @@ -387,6 +387,9 @@ message CommandSubscribe { optional KeySharedMeta keySharedMeta = 17; repeated KeyValue subscription_properties = 18; + + // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch + optional uint64 epoch = 19 [default = 0]; Review comment: ```suggestion optional uint64 consumer_epoch = 19 [default = 0]; ``` ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ########## @@ -1492,6 +1499,7 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) { consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList()); } else { + consumer.setConsumerEpoch(redeliver.getConsumerEpoch()); Review comment: We should avoid situations where the epoch can be reduced ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ########## @@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { subscriptionName, TopicOperation.CONSUME ); + + // move this because we should make the sub in this channel use one consumer future and do such as redeliver op Review comment: I think the comment here is not only for this change, we will see the comment in the codebase in the future. I think the correct description is ``` Make sure the consumer future is put into the consumers map first to avoid the same consumer ID using different consumer futures, and only remove the consumer future from the map if subscribe failed . ``` ``` ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ########## @@ -412,13 +411,20 @@ public UnAckedMessageTracker getUnAckedMessageTracker() { try { message = incomingMessages.take(); messageProcessed(message); + if (checkMessageImplConsumerEpochIsSmallerThanConsumer(message)) { Review comment: ```suggestion if (!isValidConsumerEpoch(message)) { ``` More straightforward and understandable ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ########## @@ -127,11 +130,15 @@ private final String clientAddress; // IP address only, no port number included private final MessageId startMessageId; + @Getter + @Setter + private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH; + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, int maxUnackedMessages, TransportCnx cnx, String appId, Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, - KeySharedMeta keySharedMeta, MessageId startMessageId) { + KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) { Review comment: I noticed there are lots of changes from the tests are related to the newly added param, it's better to keep the old constructor to avoid the many changes for this PR. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java ########## @@ -127,11 +130,15 @@ private final String clientAddress; // IP address only, no port number included private final MessageId startMessageId; + @Getter + @Setter + private volatile long consumerEpoch = DEFAULT_CONSUMER_EPOCH; Review comment: The constructor already init the `consumerEpoch`. ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ########## @@ -949,6 +949,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { subscriptionName, TopicOperation.CONSUME ); + + // move this because we should make the sub in this channel use one consumer future and do such as redeliver op + CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>(); + CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, + consumerFuture); Review comment: How about: ```java CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId, new CompletableFuture<>()); ``` And, `computeIfAbsent` is more elegant here. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) { return executor; } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) { + if ((getSubType() == CommandSubscribe.SubType.Failover Review comment: We should print a warn log here, which will help with troubleshooting. ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) { return executor; } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) { + if ((getSubType() == CommandSubscribe.SubType.Failover + || getSubType() == CommandSubscribe.SubType.Exclusive) + && (message).getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH Review comment: why need `(message)` here? any difference with `message.getConsumerEpoch`? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -1041,5 +1052,20 @@ private ExecutorService getInternalExecutor(Message<T> msg) { return executor; } + // If message consumer epoch is smaller than consumer epoch present that + // it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid. + // so we should release this message and receive again + protected boolean checkMessageConsumerEpochIsSmallerThanConsumer(MessageImpl<T> message) { Review comment: For more safety, we should not filter out any message for a broker with an old version protocol version. ########## File path: pulsar-common/src/main/proto/PulsarApi.proto ########## @@ -387,6 +387,9 @@ message CommandSubscribe { optional KeySharedMeta keySharedMeta = 17; repeated KeyValue subscription_properties = 18; + + // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch + optional uint64 epoch = 19 [default = 0]; Review comment: we don't need the default value? ########## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ########## @@ -1492,6 +1499,7 @@ protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessa if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) { consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList()); } else { + consumer.setConsumerEpoch(redeliver.getConsumerEpoch()); Review comment: For the non-persistent topic and shared subscription, we have disabled the consumer epoch, this also means we should prevent the modification for the consumer epoch through the message redelivery. ########## File path: pulsar-common/src/main/proto/PulsarApi.proto ########## @@ -387,6 +387,9 @@ message CommandSubscribe { optional KeySharedMeta keySharedMeta = 17; repeated KeyValue subscription_properties = 18; + + // The consumer epoch, when exclusive and failover consumer redeliver unack message will increase the epoch + optional uint64 epoch = 19 [default = 0]; Review comment: I see the DEFAULT_CONSUMER_EPOCH is -1, but here is 0. I think to make them consistent or don't assign the default value in the protocol is simpler for us? ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java ########## @@ -86,6 +90,13 @@ protected final Lock reentrantLock = new ReentrantLock(); private volatile boolean isListenerHandlingMessage = false; + @Getter + protected final AtomicLong consumerEpoch = new AtomicLong(0); Review comment: AtomicUpdater is more efficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org