This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6903b291e17 [improve][client] Add null checks for MessageAcknowledger
methods to prevent NullPointerException (#25036)
6903b291e17 is described below
commit 6903b291e17b50121de914f06073e39df02d0137
Author: Ruimin MA <[email protected]>
AuthorDate: Fri Dec 5 10:00:56 2025 +0800
[improve][client] Add null checks for MessageAcknowledger methods to
prevent NullPointerException (#25036)
---
.../apache/pulsar/client/impl/ConsumerAckTest.java | 46 +++++++++++++++++++++
.../pulsar/client/api/MessageAcknowledger.java | 47 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerBase.java | 34 ++++++++++++++++
3 files changed, 127 insertions(+)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index 6148894c230..41a3701cb7a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -42,6 +43,7 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerInterceptor;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
@@ -170,6 +172,50 @@ public class ConsumerAckTest extends ProducerConsumerBase {
assertTrue(data.consumer.getUnAckedMessageTracker().isEmpty());
}
+ @Test(timeOut = 10000)
+ public void testAcknowledgeWithNullMessageId() throws Exception {
+ final String topic = "testAcknowledgeWithNullMessageId";
+ @Cleanup final Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .subscribe();
+ List<MessageId> messageIdList = null;
+
+ // 1.pass null messageIdList to acknowledgeAsync(messageIdList, txn)
will trigger
+ // PulsarClientException.InvalidMessageException
+ assertThatThrownBy(
+ () -> consumer.acknowledgeAsync(messageIdList, null).get()
+ )
+ .isInstanceOf(ExecutionException.class)
+ .hasMessageContaining("Cannot handle messages with null
messageIdList")
+
.hasCauseInstanceOf(PulsarClientException.InvalidMessageException.class);
+
+ // 2. pass null messageIdList to acknowledge(messageIdList) will
trigger PulsarClientException
+ assertThatThrownBy(
+ () -> consumer.acknowledge(messageIdList)
+ ).isInstanceOf(PulsarClientException.class)
+ .hasMessage("Cannot handle messages with null messageIdList");
+
+ // 3. pass null messages to acknowledge(messages) will trigger
PulsarClientException
+ Messages<?> messages = null;
+ assertThatThrownBy(
+ () -> consumer.acknowledge(messages)
+ ).isInstanceOf(PulsarClientException.class)
+ .hasMessage("Cannot handle messages with null messages");
+
+ // 4. pass null messageId to acknowledgeCumulativeAsync(messageId,
txn) will trigger
+ // PulsarClientException.InvalidMessageException
+ MessageId messageId = null;
+ assertThatThrownBy(
+ () -> consumer.acknowledgeCumulativeAsync(messageId,
null).get()
+ )
+ .isInstanceOf(ExecutionException.class)
+ .hasMessageContaining("Cannot handle message with null
messageId")
+
.hasCauseInstanceOf(PulsarClientException.InvalidMessageException.class);
+
+ }
+
+
@Test
public void testCumulativeAck() throws Exception {
@Cleanup AckTestData data = prepareDataForAck("test-cumulative-ack");
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
index d1bab3abb5b..cbf6293f324 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageAcknowledger.java
@@ -51,6 +51,8 @@ public interface MessageAcknowledger {
* if the consumer was already closed
* @throws PulsarClientException.NotAllowedException
* if `messageId` is not a {@link TopicMessageId} when
multiple topics are subscribed
+ * @throws PulsarClientException.InvalidMessageException
+ * if `messageId` is {@code null}
*/
void acknowledge(MessageId messageId) throws PulsarClientException;
@@ -63,6 +65,8 @@ public interface MessageAcknowledger {
* @param messageIdList the list of message IDs.
* @throws PulsarClientException.NotAllowedException
* if any message id in the list is not a {@link TopicMessageId} when
multiple topics are subscribed
+ * @throws PulsarClientException.InvalidMessageException
+ * if `messageIdList` is {@code null} or contains any {@code null}
element
*/
void acknowledge(List<MessageId> messageIdList) throws
PulsarClientException;
@@ -88,6 +92,8 @@ public interface MessageAcknowledger {
* if the consumer was already closed
* @throws PulsarClientException.NotAllowedException
* if `messageId` is not a {@link TopicMessageId} when
multiple topics are subscribed
+ * @throws PulsarClientException.InvalidMessageException
+ * if `messageId` is {@code null}
*/
void acknowledgeCumulative(MessageId messageId) throws
PulsarClientException;
@@ -97,11 +103,22 @@ public interface MessageAcknowledger {
/**
* The asynchronous version of {@link #acknowledge(MessageId)} with
transaction support.
+ *
+ * @param messageId {@link MessageId} to be individual acknowledged
+ * @param txn {@link Transaction} the transaction to ack with, or {@code
null} for non-transactional ack
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messageId` is {@code null}.
*/
CompletableFuture<Void> acknowledgeAsync(MessageId messageId, Transaction
txn);
/**
* The asynchronous version of {@link #acknowledge(MessageId)}.
+ *
+ * @param messageId {@link MessageId} to be individual acknowledged
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messageId` is {@code null}.
*/
default CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
return acknowledgeAsync(messageId, null);
@@ -109,26 +126,53 @@ public interface MessageAcknowledger {
/**
* The asynchronous version of {@link #acknowledge(List)} with transaction
support.
+ *
+ * @param messageIdList the list of message IDs.
+ * @param txn {@link Transaction} the transaction to ack with, or {@code
null} for non-transactional ack
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messageIdList` is {@code null} or contains any {@code null}
element.
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList,
Transaction txn);
/**
* The asynchronous version of {@link #acknowledge(List)}.
+ *
+ * @param messageIdList the list of message IDs.
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messageIdList` is {@code null} or contains any {@code null}
element.
*/
CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList);
/**
* The asynchronous version of {@link #acknowledge(Message)}.
+ *
+ * @param message {@link Message} to be individual acknowledged
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `message` is {@code null} or its {@link
Message#getMessageId()} returns {@code null}.
*/
CompletableFuture<Void> acknowledgeAsync(Message<?> message);
/**
* The asynchronous version of {@link #acknowledge(Messages)}.
+ *
+ * @param messages {@link Messages} to be acknowledged
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messages` is {@code null} or contains any {@code null}
message.
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages);
/**
* The asynchronous version of {@link #acknowledge(Messages)} with
transaction support.
+ *
+ * @param messages {@link Messages} to be acknowledged
+ * @param txn {@link Transaction} the transaction to ack with, or {@code
null} for non-transactional ack
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messages` is {@code null} or contains any {@code null}
message.
*/
CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction
txn);
@@ -142,6 +186,9 @@ public interface MessageAcknowledger {
* @param messageId
* The {@code MessageId} to be cumulatively acknowledged
* @param txn {@link Transaction} the transaction to cumulative ack
+ * @return a future that completes when the acknowledge operation is sent.
+ * The future will complete exceptionally with {@link
PulsarClientException.InvalidMessageException}
+ * if `messageId` is {@code null}.
*/
CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
Transaction txn);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index f3f3a6bf116..749d23651aa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -416,6 +416,23 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
}
}
+ private static void validateMessageIds(List<MessageId> messageIdList)
throws PulsarClientException {
+ if (messageIdList == null) {
+ throw new PulsarClientException.InvalidMessageException("Cannot
handle messages with null messageIdList");
+ }
+ for (MessageId messageId : messageIdList) {
+ validateMessageId(messageId);
+ }
+ }
+
+ private static void validateMessages(Messages<?> messages) throws
PulsarClientException {
+ if (messages == null) {
+ throw new PulsarClientException.InvalidMessageException("Cannot
handle messages with null messages");
+ }
+ for (Message<?> message : messages) {
+ validateMessageId(message);
+ }
+ }
@Override
public void acknowledge(Message<?> message) throws PulsarClientException {
validateMessageId(message);
@@ -438,6 +455,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public void acknowledge(List<MessageId> messageIdList) throws
PulsarClientException {
try {
+ validateMessageIds(messageIdList);
acknowledgeAsync(messageIdList).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -450,6 +468,7 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public void acknowledge(Messages<?> messages) throws PulsarClientException
{
try {
+ validateMessages(messages);
acknowledgeAsync(messages).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -635,6 +654,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public CompletableFuture<Void> acknowledgeAsync(MessageId messageId,
Transaction txn) {
+ try {
+ validateMessageId(messageId);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
+ }
TransactionImpl txnImpl = null;
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
@@ -654,6 +678,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
@Override
public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId
messageId, Transaction txn) {
+ try {
+ validateMessageId(messageId);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
+ }
if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
return FutureUtil.failedFuture(new
PulsarClientException.InvalidConfigurationException(
"Cannot use cumulative acks on a
non-exclusive/non-failover subscription"));
@@ -675,6 +704,11 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId>
messageIdList, AckType ackType,
Map<String, Long>
properties,
TransactionImpl
txn) {
+ try {
+ validateMessageIds(messageIdList);
+ } catch (PulsarClientException e) {
+ return FutureUtil.failedFuture(e);
+ }
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {
ackFuture = txn.registerAckedTopic(getTopic(), subscription)