This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ad0e5a  [Java Client] Use failPendingMessages to ensure proper 
cleanup (#12259)
2ad0e5a is described below

commit 2ad0e5afccaaae85969d2924920a55ce95e248f6
Author: Michael Marshall <mikemars...@gmail.com>
AuthorDate: Tue Oct 5 19:15:45 2021 -0500

    [Java Client] Use failPendingMessages to ensure proper cleanup (#12259)
    
    * [Java Client] Use failPendingMessages to ensure proper cleanup
    
    * Update method name from code review comments
    
    * Update 
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
    
    Co-authored-by: Matteo Merli <mme...@apache.org>
    
    * Move setState into sync block; consolidate client.cleanupProducer call
    
    * Move cleanupProducer into sync block
    
    * Make method closeAndClearPendingMessages synchronized
    
    Co-authored-by: Matteo Merli <mme...@apache.org>
---
 .../pulsar/client/impl/ProducerCloseTest.java      | 26 +++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 29 ++++++----------------
 2 files changed, 34 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
index 0c4df15..706849f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java
@@ -32,6 +32,7 @@ import org.testng.annotations.Test;
 
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 @Test(groups = "broker-impl")
@@ -73,6 +74,31 @@ public class ProducerCloseTest extends ProducerConsumerBase {
         Assert.assertEquals(completableFuture.isDone(), true);
     }
 
+    @Test(timeOut = 10_000)
+    public void 
testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws 
Exception {
+        initClient();
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic("testProducerClose")
+                .maxPendingMessages(10)
+                .batchingMaxPublishDelay(10, TimeUnit.SECONDS)
+                .batchingMaxBytes(Integer.MAX_VALUE)
+                .enableBatching(true)
+                .create();
+        CompletableFuture<MessageId> completableFuture = producer.newMessage()
+            .value("test-msg".getBytes(StandardCharsets.UTF_8))
+            .sendAsync();
+        // By setting the state to Failed, the close method will exit early 
because the previous state was not Ready.
+        producer.setState(HandlerState.State.Failed);
+        producer.closeAsync();
+        Assert.assertTrue(completableFuture.isCompletedExceptionally());
+        try {
+            completableFuture.get();
+        } catch (ExecutionException e) {
+            Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.AlreadyClosedException);
+        }
+    }
+
     private void initClient() throws PulsarClientException {
         pulsarClient = PulsarClient.builder().
                 serviceUrl(lookupUrl.toString())
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index d0d3db1..5177451 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -875,12 +875,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         ClientCnx cnx = cnx();
         if (cnx == null || currentState != State.Ready) {
             log.info("[{}] [{}] Closed Producer (not connected)", topic, 
producerName);
-            synchronized (this) {
-                setState(State.Closed);
-                client.cleanupProducer(this);
-                clearPendingMessagesWhenClose();
-            }
-
+            closeAndClearPendingMessages();
             return CompletableFuture.completedFuture(null);
         }
 
@@ -893,14 +888,9 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             if (exception == null || !cnx.ctx().channel().isActive()) {
                 // Either we've received the success response for the close 
producer command from the broker, or the
                 // connection did break in the meantime. In any case, the 
producer is gone.
-                synchronized (ProducerImpl.this) {
-                    log.info("[{}] [{}] Closed Producer", topic, producerName);
-                    setState(State.Closed);
-                    clearPendingMessagesWhenClose();
-                }
-
+                log.info("[{}] [{}] Closed Producer", topic, producerName);
+                closeAndClearPendingMessages();
                 closeFuture.complete(null);
-                client.cleanupProducer(this);
             } else {
                 closeFuture.completeExceptionally(exception);
             }
@@ -911,17 +901,14 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return closeFuture;
     }
 
-    private void clearPendingMessagesWhenClose() {
+    private synchronized void closeAndClearPendingMessages() {
+        setState(State.Closed);
+        client.cleanupProducer(this);
         PulsarClientException ex = new 
PulsarClientException.AlreadyClosedException(
                 format("The producer %s of the topic %s was already closed 
when closing the producers",
                         producerName, topic));
-        pendingMessages.forEach(msg -> {
-            
client.getMemoryLimitController().releaseMemory(msg.uncompressedSize);
-            msg.sendComplete(ex);
-            msg.cmd.release();
-            msg.recycle();
-        });
-        pendingMessages.clear();
+        // Use null for cnx to ensure that the pending messages are failed 
immediately
+        failPendingMessages(null, ex);
     }
 
     @Override

Reply via email to