This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 64d1c5a Fix issue producer not closed in PersistentTopicsTest (#12584) 64d1c5a is described below commit 64d1c5aacae1781053383ae20a13d09d4c5646c6 Author: Jason918 <jason....@qq.com> AuthorDate: Wed Nov 3 02:11:57 2021 +0800 Fix issue producer not closed in PersistentTopicsTest (#12584) Co-authored-by: Jiang Haiting <jianghait...@didichuxing.com> --- .../org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 4a27dbb..9c3221d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -47,6 +47,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.admin.v2.NonPersistentTopics; import org.apache.pulsar.broker.admin.v2.PersistentTopics; @@ -699,6 +700,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String topicName = "persistent://prop-xyz/ns1/testGetBacklogSize"; admin.topics().createPartitionedTopic(topicName, 1); + @Cleanup Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName) .enableBatching(false) .create(); @@ -718,6 +720,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String topicName = "persistent://prop-xyz/ns1/testGetLastMessageId"; admin.topics().createNonPartitionedTopic(topicName); + @Cleanup Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName) .enableBatching(true) .batchingMaxMessages(100) @@ -731,6 +734,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { completableFuture.get(); Assert.assertEquals(((BatchMessageIdImpl) admin.topics().getLastMessageId(topicName)).getBatchIndex(), 9); + @Cleanup Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName) .enableBatching(false) .create(); @@ -748,6 +752,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String topicName = "persistent://tenant-xyz/ns-abc/topic-123"; admin.topics().createPartitionedTopic(topicName, 2); + @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName + "-partition-0").create(); // Check examine message not allowed on partitioned topic. @@ -786,6 +791,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String topicName = "persistent://tenant-xyz/ns-abc/topic-testExamineMessageMetadata"; admin.topics().createPartitionedTopic(topicName, 2); + @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .producerName("testExamineMessageMetadataProducer") .compressionType(CompressionType.LZ4) @@ -923,11 +929,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2"; admin.topics().createNonPartitionedTopic(topicName1); admin.topics().createNonPartitionedTopic(topicName2); + @Cleanup ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1) .enableBatching(false).create(); String data1 = "test1"; MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes()); - + @Cleanup ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2) .enableBatching(false).create(); String data2 = "test2"; @@ -965,6 +972,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { admin.topics().createNonPartitionedTopic(topicName); AtomicLong publishTime = new AtomicLong(0); + @Cleanup ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName) .enableBatching(false) .intercept(new ProducerInterceptor() { @@ -1018,6 +1026,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>(); + @Cleanup ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName) .enableBatching(true) .batchingMaxPublishDelay(1, TimeUnit.MINUTES)