This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 993ea62  Fix NPE when ACK grouping tracker checks duplicated message 
id (#10586)
993ea62 is described below

commit 993ea62108ed2c21bd14e0f1f7e792e0154435b9
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Fri May 14 19:44:27 2021 +0800

    Fix NPE when ACK grouping tracker checks duplicated message id (#10586)
---
 .../src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java  | 3 +++
 .../client/impl/PersistentAcknowledgmentsGroupingTracker.java       | 6 +++++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 855ee64..2d57185 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -200,6 +200,9 @@ public class MessageIdImpl implements MessageId {
 
     @Override
     public int compareTo(MessageId o) {
+        if (o == null) {
+            throw new UnsupportedOperationException("MessageId is null");
+        }
         if (o instanceof MessageIdImpl) {
             MessageIdImpl other = (MessageIdImpl) o;
             int batchIndex = (o instanceof BatchMessageIdImpl) ? 
((BatchMessageIdImpl) o).getBatchIndex() : NO_BATCH;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
index 9eee2bf..09d6bd5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
@@ -36,6 +36,7 @@ import 
java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import io.netty.util.Recycler;
+import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.client.api.MessageId;
@@ -113,7 +114,10 @@ public class PersistentAcknowledgmentsGroupingTracker 
implements Acknowledgments
      * resent after a disconnection and for which the user has already sent an 
acknowledgement.
      */
     @Override
-    public boolean isDuplicate(MessageId messageId) {
+    public boolean isDuplicate(@NonNull MessageId messageId) {
+        if (lastCumulativeAck.messageId == null) {
+            return false;
+        }
         if (messageId.compareTo(lastCumulativeAck.messageId) <= 0) {
             // Already included in a cumulative ack
             return true;

Reply via email to