This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 2ad0e5a [Java Client] Use failPendingMessages to ensure proper cleanup (#12259) 2ad0e5a is described below commit 2ad0e5afccaaae85969d2924920a55ce95e248f6 Author: Michael Marshall <mikemars...@gmail.com> AuthorDate: Tue Oct 5 19:15:45 2021 -0500 [Java Client] Use failPendingMessages to ensure proper cleanup (#12259) * [Java Client] Use failPendingMessages to ensure proper cleanup * Update method name from code review comments * Update pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java Co-authored-by: Matteo Merli <mme...@apache.org> * Move setState into sync block; consolidate client.cleanupProducer call * Move cleanupProducer into sync block * Make method closeAndClearPendingMessages synchronized Co-authored-by: Matteo Merli <mme...@apache.org> --- .../pulsar/client/impl/ProducerCloseTest.java | 26 +++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 29 ++++++---------------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java index 0c4df15..706849f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerCloseTest.java @@ -32,6 +32,7 @@ import org.testng.annotations.Test; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @Test(groups = "broker-impl") @@ -73,6 +74,31 @@ public class ProducerCloseTest extends ProducerConsumerBase { Assert.assertEquals(completableFuture.isDone(), true); } + @Test(timeOut = 10_000) + public void testProducerCloseFailsPendingBatchWhenPreviousStateNotReadyCallback() throws Exception { + initClient(); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerClose") + .maxPendingMessages(10) + .batchingMaxPublishDelay(10, TimeUnit.SECONDS) + .batchingMaxBytes(Integer.MAX_VALUE) + .enableBatching(true) + .create(); + CompletableFuture<MessageId> completableFuture = producer.newMessage() + .value("test-msg".getBytes(StandardCharsets.UTF_8)) + .sendAsync(); + // By setting the state to Failed, the close method will exit early because the previous state was not Ready. + producer.setState(HandlerState.State.Failed); + producer.closeAsync(); + Assert.assertTrue(completableFuture.isCompletedExceptionally()); + try { + completableFuture.get(); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof PulsarClientException.AlreadyClosedException); + } + } + private void initClient() throws PulsarClientException { pulsarClient = PulsarClient.builder(). serviceUrl(lookupUrl.toString()) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index d0d3db1..5177451 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -875,12 +875,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne ClientCnx cnx = cnx(); if (cnx == null || currentState != State.Ready) { log.info("[{}] [{}] Closed Producer (not connected)", topic, producerName); - synchronized (this) { - setState(State.Closed); - client.cleanupProducer(this); - clearPendingMessagesWhenClose(); - } - + closeAndClearPendingMessages(); return CompletableFuture.completedFuture(null); } @@ -893,14 +888,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne if (exception == null || !cnx.ctx().channel().isActive()) { // Either we've received the success response for the close producer command from the broker, or the // connection did break in the meantime. In any case, the producer is gone. - synchronized (ProducerImpl.this) { - log.info("[{}] [{}] Closed Producer", topic, producerName); - setState(State.Closed); - clearPendingMessagesWhenClose(); - } - + log.info("[{}] [{}] Closed Producer", topic, producerName); + closeAndClearPendingMessages(); closeFuture.complete(null); - client.cleanupProducer(this); } else { closeFuture.completeExceptionally(exception); } @@ -911,17 +901,14 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return closeFuture; } - private void clearPendingMessagesWhenClose() { + private synchronized void closeAndClearPendingMessages() { + setState(State.Closed); + client.cleanupProducer(this); PulsarClientException ex = new PulsarClientException.AlreadyClosedException( format("The producer %s of the topic %s was already closed when closing the producers", producerName, topic)); - pendingMessages.forEach(msg -> { - client.getMemoryLimitController().releaseMemory(msg.uncompressedSize); - msg.sendComplete(ex); - msg.cmd.release(); - msg.recycle(); - }); - pendingMessages.clear(); + // Use null for cnx to ensure that the pending messages are failed immediately + failPendingMessages(null, ex); } @Override