gaoran10 commented on code in PR #22393:
URL: https://github.com/apache/pulsar/pull/22393#discussion_r1548082039


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -378,80 +378,93 @@ CompletableFuture<MessageId> internalSendAsync(Message<?> 
message) {
         pendingMessagesUpDownCounter.increment();
         pendingBytesUpDownCounter.add(msgSize);
 
-        sendAsync(interceptorMessage, new SendCallback() {
-            SendCallback nextCallback = null;
-            MessageImpl<?> nextMsg = null;
-            long createdAt = System.nanoTime();
+        sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, 
interceptorMessage, msgSize));
+        return future;
+    }
 
-            @Override
-            public CompletableFuture<MessageId> getFuture() {
-                return future;
-            }
+    private class DefaultSendMessageCallback implements SendCallback {
 
-            @Override
-            public SendCallback getNextSendCallback() {
-                return nextCallback;
-            }
+        CompletableFuture<MessageId> sendFuture;
+        MessageImpl<?> currentMsg;
+        int msgSize;
+        long createdAt = System.nanoTime();
+        SendCallback nextCallback = null;
+        MessageImpl<?> nextMsg = null;
 
-            @Override
-            public MessageImpl<?> getNextMessage() {
-                return nextMsg;
-            }
+        DefaultSendMessageCallback(CompletableFuture<MessageId> sendFuture, 
MessageImpl<?> currentMsg, int msgSize) {
+            this.sendFuture = sendFuture;
+            this.currentMsg = currentMsg;
+            this.msgSize = msgSize;
+        }
 
-            @Override
-            public void sendComplete(Exception e) {
-                long latencyNanos = System.nanoTime() - createdAt;
-                pendingMessagesUpDownCounter.decrement();
-                pendingBytesUpDownCounter.subtract(msgSize);
+        @Override
+        public CompletableFuture<MessageId> getFuture() {
+            return sendFuture;
+        }
+
+        @Override
+        public SendCallback getNextSendCallback() {
+            return nextCallback;
+        }
+
+        @Override
+        public MessageImpl<?> getNextMessage() {
+            return nextMsg;
+        }
 
+        @Override
+        public void sendComplete(Exception e) {
+            SendCallback loopingCallback = this;
+            MessageImpl<?> loopingMsg = currentMsg;
+            while (loopingCallback != null) {
+                onSendComplete(e, loopingCallback, loopingMsg);
+                loopingMsg = loopingCallback.getNextMessage();
+                loopingCallback = loopingCallback.getNextSendCallback();
+            }
+        }
+
+        private void onSendComplete(Exception e, SendCallback sendCallback, 
MessageImpl<?> msg) {
+            long createdAt = (sendCallback instanceof 
ProducerImpl.DefaultSendMessageCallback)
+                    ? ((DefaultSendMessageCallback) sendCallback).createdAt : 
this.createdAt;
+            long latencyNanos = System.nanoTime() - createdAt;
+            pendingMessagesUpDownCounter.decrement();
+            pendingBytesUpDownCounter.subtract(msgSize);
+            ByteBuf payload = msg.getDataBuffer();
+            if (e != null) {
+                latencyHistogram.recordFailure(latencyNanos);
+                stats.incrementSendFailed();
                 try {
-                    if (e != null) {
-                        latencyHistogram.recordFailure(latencyNanos);
-                        stats.incrementSendFailed();
-                        onSendAcknowledgement(interceptorMessage, null, e);
-                        future.completeExceptionally(e);
-                    } else {
-                        latencyHistogram.recordSuccess(latencyNanos);
-                        publishedBytesCounter.add(msgSize);
-                        onSendAcknowledgement(interceptorMessage, 
interceptorMessage.getMessageId(), null);
-                        future.complete(interceptorMessage.getMessageId());
-                        stats.incrementNumAcksReceived(latencyNanos);
-                    }
+                    onSendAcknowledgement(msg, null, e);
+                    sendCallback.getFuture().completeExceptionally(e);
                 } finally {
-                    interceptorMessage.getDataBuffer().release();
+                    if (payload == null) {
+                        log.error("[{}] [{}] Payload is null when calling a 
failed onSendComplete, which is not"
+                                + " expected.", topic, producerName);

Review Comment:
   Maybe we should return.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to