This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 948b33a3ff0cbb799d2ad39b3f2645ee9fe32c06 Author: pengxiangrui127 <[email protected]> AuthorDate: Fri Dec 20 19:17:13 2024 +0800 [Fix][Client] Fix pending message not complete when closeAsync (#23761) (cherry picked from commit e0a9e4c7b5d3533f2a1e5b7757b180168412c35e) --- .../apache/pulsar/client/impl/ProducerImpl.java | 5 ++- .../pulsar/client/impl/ProducerImplTest.java | 43 ++++++++++++++++++++-- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 10e0ee2ee3d..54d337925dc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1177,11 +1177,11 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne CompletableFuture<Void> closeFuture = new CompletableFuture<>(); cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> { cnx.removeProducer(producerId); + closeAndClearPendingMessages(); if (exception == null || !cnx.ctx().channel().isActive()) { // Either we've received the success response for the close producer command from the broker, or the // connection did break in the meantime. In any case, the producer is gone. log.info("[{}] [{}] Closed Producer", topic, producerName); - closeAndClearPendingMessages(); closeFuture.complete(null); } else { closeFuture.completeExceptionally(exception); @@ -1193,7 +1193,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return closeFuture; } - private synchronized void closeAndClearPendingMessages() { + @VisibleForTesting + protected synchronized void closeAndClearPendingMessages() { setState(State.Closed); client.cleanupProducer(this); PulsarClientException ex = new PulsarClientException.AlreadyClosedException( diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index f9df6375939..5f690ead6c5 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -22,12 +22,19 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.*; + import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import io.netty.util.HashedWheelTimer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.metrics.LatencyHistogram; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -68,4 +75,34 @@ public class ProducerImplTest { verify(msg).setSchemaState(MessageImpl.SchemaState.Ready); } + @Test + public void testClearPendingMessageWhenCloseAsync() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + Mockito.doReturn(1L).when(client).newProducerId(); + ClientConfigurationData clientConf = new ClientConfigurationData(); + clientConf.setStatsIntervalSeconds(-1); + Mockito.doReturn(clientConf).when(client).getConfiguration(); + Mockito.doReturn(new InstrumentProvider(null)).when(client).instrumentProvider(); + ConnectionPool connectionPool = mock(ConnectionPool.class); + Mockito.doReturn(1).when(connectionPool).genRandomKeyToSelectCon(); + Mockito.doReturn(connectionPool).when(client).getCnxPool(); + HashedWheelTimer timer = mock(HashedWheelTimer.class); + Mockito.doReturn(null).when(timer).newTimeout(Mockito.any(), Mockito.anyLong(), Mockito.any()); + Mockito.doReturn(timer).when(client).timer(); + ProducerConfigurationData producerConf = new ProducerConfigurationData(); + producerConf.setSendTimeoutMs(-1); + ProducerImpl<?> producer = Mockito.spy(new ProducerImpl<>(client, "topicName", producerConf, null, 0, null, null, Optional.empty())); + + // make sure throw exception when send request to broker + ClientCnx clientCnx = mock(ClientCnx.class); + CompletableFuture<ProducerResponse> tCompletableFuture = new CompletableFuture<>(); + tCompletableFuture.completeExceptionally(new PulsarClientException("error")); + when(clientCnx.sendRequestWithId(Mockito.any(), Mockito.anyLong())).thenReturn(tCompletableFuture); + Mockito.doReturn(clientCnx).when(producer).cnx(); + + // run closeAsync and verify + CompletableFuture<Void> voidCompletableFuture = producer.closeAsync(); + verify(producer).closeAndClearPendingMessages(); + } + }
