This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6903b291e17 [improve][client] Add null checks for MessageAcknowledger 
methods to prevent NullPointerException (#25036)
6903b291e17 is described below

commit 6903b291e17b50121de914f06073e39df02d0137
Author: Ruimin MA <[email protected]>
AuthorDate: Fri Dec 5 10:00:56 2025 +0800

    [improve][client] Add null checks for MessageAcknowledger methods to 
prevent NullPointerException (#25036)
---
 .../apache/pulsar/client/impl/ConsumerAckTest.java | 46 +++++++++++++++++++++
 .../pulsar/client/api/MessageAcknowledger.java     | 47 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 34 ++++++++++++++++
 3 files changed, 127 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index 6148894c230..41a3701cb7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -42,6 +43,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerInterceptor;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -170,6 +172,50 @@ public class ConsumerAckTest extends ProducerConsumerBase {
         assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
     }
 
+    @Test(timeOut = 10000)
+    public void testAcknowledgeWithNullMessageId() throws Exception {
+        final String topic = "testAcknowledgeWithNullMessageId";
+        @Cleanup final Consumer<String> consumer = 
pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .subscribe();
+        List<MessageId> messageIdList = null;
+
+        // 1.pass null messageIdList to acknowledgeAsync(messageIdList, txn) 
will trigger
+        // PulsarClientException.InvalidMessageException
+        assertThatThrownBy(
+                () -> consumer.acknowledgeAsync(messageIdList, null).get()
+        )
+                .isInstanceOf(ExecutionException.class)
+                .hasMessageContaining("Cannot handle messages with null 
messageIdList")
+                
.hasCauseInstanceOf(PulsarClientException.InvalidMessageException.class);
+
+        // 2. pass null messageIdList to acknowledge(messageIdList) will 
trigger PulsarClientException
+        assertThatThrownBy(
+                () -> consumer.acknowledge(messageIdList)
+        ).isInstanceOf(PulsarClientException.class)
+                .hasMessage("Cannot handle messages with null messageIdList");
+
+        // 3. pass null messages to acknowledge(messages) will trigger 
PulsarClientException
+        Messages<?> messages = null;
+        assertThatThrownBy(
+                () -> consumer.acknowledge(messages)
+        ).isInstanceOf(PulsarClientException.class)
+                .hasMessage("Cannot handle messages with null messages");
+
+        // 4. pass null messageId to acknowledgeCumulativeAsync(messageId, 
txn) will trigger
+        // PulsarClientException.InvalidMessageException
+        MessageId messageId = null;
+        assertThatThrownBy(
+                () -> consumer.acknowledgeCumulativeAsync(messageId, 
null).get()
+        )
+                .isInstanceOf(ExecutionException.class)
+                .hasMessageContaining("Cannot handle message with null 
messageId")
+                
.hasCauseInstanceOf(PulsarClientException.InvalidMessageException.class);
+
+    }
+
+
     @Test
     public void testCumulativeAck() throws Exception {
         @Cleanup AckTestData data = prepareDataForAck("test-cumulative-ack");
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
index d1bab3abb5b..cbf6293f324 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
@@ -51,6 +51,8 @@ public interface MessageAcknowledger {
      *             if the consumer was already closed
      * @throws PulsarClientException.NotAllowedException
      *             if `messageId` is not a {@link TopicMessageId} when 
multiple topics are subscribed
+     * @throws PulsarClientException.InvalidMessageException
+     *             if `messageId` is {@code null}
      */
     void acknowledge(MessageId messageId) throws PulsarClientException;
 
@@ -63,6 +65,8 @@ public interface MessageAcknowledger {
      * @param messageIdList the list of message IDs.
      * @throws PulsarClientException.NotAllowedException
      *     if any message id in the list is not a {@link TopicMessageId} when 
multiple topics are subscribed
+     * @throws PulsarClientException.InvalidMessageException
+     *     if `messageIdList` is {@code null} or contains any {@code null} 
element
      */
     void acknowledge(List<MessageId> messageIdList) throws 
PulsarClientException;
 
@@ -88,6 +92,8 @@ public interface MessageAcknowledger {
      *             if the consumer was already closed
      * @throws PulsarClientException.NotAllowedException
      *             if `messageId` is not a {@link TopicMessageId} when 
multiple topics are subscribed
+     * @throws PulsarClientException.InvalidMessageException
+     *             if `messageId` is {@code null}
      */
     void acknowledgeCumulative(MessageId messageId) throws 
PulsarClientException;
 
@@ -97,11 +103,22 @@ public interface MessageAcknowledger {
 
     /**
      * The asynchronous version of {@link #acknowledge(MessageId)} with 
transaction support.
+     *
+     * @param messageId {@link MessageId} to be individual acknowledged
+     * @param txn {@link Transaction} the transaction to ack with, or {@code 
null} for non-transactional ack
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messageId` is {@code null}.
      */
     CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction 
txn);
 
     /**
      * The asynchronous version of {@link #acknowledge(MessageId)}.
+     *
+     * @param messageId {@link MessageId} to be individual acknowledged
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messageId` is {@code null}.
      */
     default CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
         return acknowledgeAsync(messageId, null);
@@ -109,26 +126,53 @@ public interface MessageAcknowledger {
 
     /**
      * The asynchronous version of {@link #acknowledge(List)} with transaction 
support.
+     *
+     * @param messageIdList the list of message IDs.
+     * @param txn {@link Transaction} the transaction to ack with, or {@code 
null} for non-transactional ack
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messageIdList` is {@code null} or contains any {@code null} 
element.
      */
     CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, 
Transaction txn);
 
     /**
      * The asynchronous version of {@link #acknowledge(List)}.
+     *
+     * @param messageIdList the list of message IDs.
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messageIdList` is {@code null} or contains any {@code null} 
element.
      */
     CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
 
     /**
      * The asynchronous version of {@link #acknowledge(Message)}.
+     *
+     * @param message {@link Message} to be individual acknowledged
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `message` is {@code null} or its {@link 
Message#getMessageId()} returns {@code null}.
      */
     CompletableFuture<Void> acknowledgeAsync(Message<?> message);
 
     /**
      * The asynchronous version of {@link #acknowledge(Messages)}.
+     *
+     * @param messages {@link Messages} to be acknowledged
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messages` is {@code null} or contains any {@code null} 
message.
      */
     CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
 
     /**
      * The asynchronous version of {@link #acknowledge(Messages)} with 
transaction support.
+     *
+     * @param messages {@link Messages} to be acknowledged
+     * @param txn {@link Transaction} the transaction to ack with, or {@code 
null} for non-transactional ack
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messages` is {@code null} or contains any {@code null} 
message.
      */
     CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction 
txn);
 
@@ -142,6 +186,9 @@ public interface MessageAcknowledger {
      * @param messageId
      *            The {@code MessageId} to be cumulatively acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
+     * @return a future that completes when the acknowledge operation is sent.
+     *         The future will complete exceptionally with {@link 
PulsarClientException.InvalidMessageException}
+     *         if `messageId` is {@code null}.
      */
     CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
                                                        Transaction txn);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f3f3a6bf116..749d23651aa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -416,6 +416,23 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
         }
     }
 
+    private static void validateMessageIds(List<MessageId> messageIdList) 
throws PulsarClientException {
+        if (messageIdList == null) {
+            throw new PulsarClientException.InvalidMessageException("Cannot 
handle messages with null messageIdList");
+        }
+        for (MessageId messageId : messageIdList) {
+            validateMessageId(messageId);
+        }
+    }
+
+    private static void validateMessages(Messages<?> messages) throws 
PulsarClientException {
+        if (messages == null) {
+            throw new PulsarClientException.InvalidMessageException("Cannot 
handle messages with null messages");
+        }
+        for (Message<?> message : messages) {
+            validateMessageId(message);
+        }
+    }
     @Override
     public void acknowledge(Message<?> message) throws PulsarClientException {
         validateMessageId(message);
@@ -438,6 +455,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     @Override
     public void acknowledge(List<MessageId> messageIdList) throws 
PulsarClientException {
         try {
+            validateMessageIds(messageIdList);
             acknowledgeAsync(messageIdList).get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -450,6 +468,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     @Override
     public void acknowledge(Messages<?> messages) throws PulsarClientException 
{
         try {
+            validateMessages(messages);
             acknowledgeAsync(messages).get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -635,6 +654,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     @Override
     public CompletableFuture<Void> acknowledgeAsync(MessageId messageId,
                                                     Transaction txn) {
+        try {
+            validateMessageId(messageId);
+        } catch (PulsarClientException e) {
+            return FutureUtil.failedFuture(e);
+        }
         TransactionImpl txnImpl = null;
         if (null != txn) {
             checkArgument(txn instanceof TransactionImpl);
@@ -654,6 +678,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
 
     @Override
     public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId 
messageId, Transaction txn) {
+        try {
+            validateMessageId(messageId);
+        } catch (PulsarClientException e) {
+            return FutureUtil.failedFuture(e);
+        }
         if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
             return FutureUtil.failedFuture(new 
PulsarClientException.InvalidConfigurationException(
                     "Cannot use cumulative acks on a 
non-exclusive/non-failover subscription"));
@@ -675,6 +704,11 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
     protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> 
messageIdList, AckType ackType,
                                                            Map<String, Long> 
properties,
                                                            TransactionImpl 
txn) {
+        try {
+            validateMessageIds(messageIdList);
+        } catch (PulsarClientException e) {
+            return FutureUtil.failedFuture(e);
+        }
         CompletableFuture<Void> ackFuture;
         if (txn != null && this instanceof ConsumerImpl) {
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)

Reply via email to