This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 993ea62 Fix NPE when ACK grouping tracker checks duplicated message id (#10586) 993ea62 is described below commit 993ea62108ed2c21bd14e0f1f7e792e0154435b9 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Fri May 14 19:44:27 2021 +0800 Fix NPE when ACK grouping tracker checks duplicated message id (#10586) --- .../src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java | 3 +++ .../client/impl/PersistentAcknowledgmentsGroupingTracker.java | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 855ee64..2d57185 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -200,6 +200,9 @@ public class MessageIdImpl implements MessageId { @Override public int compareTo(MessageId o) { + if (o == null) { + throw new UnsupportedOperationException("MessageId is null"); + } if (o instanceof MessageIdImpl) { MessageIdImpl other = (MessageIdImpl) o; int batchIndex = (o instanceof BatchMessageIdImpl) ? ((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 9eee2bf..09d6bd5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.ReentrantReadWriteLock; import io.netty.util.Recycler; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.MessageId; @@ -113,7 +114,10 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments * resent after a disconnection and for which the user has already sent an acknowledgement. */ @Override - public boolean isDuplicate(MessageId messageId) { + public boolean isDuplicate(@NonNull MessageId messageId) { + if (lastCumulativeAck.messageId == null) { + return false; + } if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) { // Already included in a cumulative ack return true;