This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b648f514816caffb64d24943d4fbf123cb25e4b6 Author: 道君- Tao Jiuming <[email protected]> AuthorDate: Mon Jul 21 14:16:04 2025 +0800 [fix][client] Fix ClientCnx handleSendError NPE (#24517) (cherry picked from commit bb3b6a4e081c63e13afaff6c05169219c0002011) --- .../java/org/apache/pulsar/client/impl/ClientCnx.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index d0361069aa7..c737a2e9a78 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -788,20 +788,25 @@ public class ClientCnx extends PulsarHandler { long producerId = sendError.getProducerId(); long sequenceId = sendError.getSequenceId(); + ProducerImpl<?> producer = producers.get(producerId); + if (producer == null) { + log.warn("{} Producer with id {} not found while handling send error", ctx.channel(), producerId); + return; + } + switch (sendError.getError()) { case ChecksumError: - producers.get(producerId).recoverChecksumError(this, sequenceId); + producer.recoverChecksumError(this, sequenceId); break; - case TopicTerminatedError: - producers.get(producerId).terminated(this); + producer.terminated(this); break; case NotAllowedError: - producers.get(producerId).recoverNotAllowedError(sequenceId, sendError.getMessage()); + producer.recoverNotAllowedError(sequenceId, sendError.getMessage()); break; default: // don't close this ctx, otherwise it will close all consumers and producers which use this ctx - producers.get(producerId).connectionClosed(this, Optional.empty(), Optional.empty()); + producer.connectionClosed(this, Optional.empty(), Optional.empty()); } }
