This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 3adb59c51b3 [fix][client] Prevent retry topic and dead letter topic
producer leaks when sending of message fails (#23824)
3adb59c51b3 is described below
commit 3adb59c51b3e28ee6fa003959612ce3e914dd145
Author: Lari Hotari <[email protected]>
AuthorDate: Thu Jan 9 11:10:49 2025 -0800
[fix][client] Prevent retry topic and dead letter topic producer leaks when
sending of message fails (#23824)
(cherry picked from commit 04e89fe2d841246e655bf875ba52cda2c2de0e3d)
---
.../pulsar/client/api/DeadLetterTopicTest.java | 93 ++++++
.../apache/pulsar/client/api/RetryTopicTest.java | 166 ++++++++---
.../apache/pulsar/client/impl/ConsumerImpl.java | 315 +++++++++++++--------
.../client/impl/MultiTopicsConsumerImpl.java | 9 +-
.../src/main/resources/findbugsExclude.xml | 4 +
5 files changed, 423 insertions(+), 164 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
index e46fddeacc1..ab26949c04f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -40,9 +41,11 @@ import java.util.regex.Pattern;
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1167,4 +1170,94 @@ public class DeadLetterTopicTest extends
ProducerConsumerBase {
consumerBuilder.loadConf(config);
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(),
policy);
}
+
+ @Data
+ static class Payload {
+ String number;
+
+ public Payload() {
+
+ }
+
+ public Payload(String number) {
+ this.number = number;
+ }
+ }
+
+ @Data
+ static class PayloadIncompatible {
+ long number;
+
+ public PayloadIncompatible() {
+
+ }
+
+ public PayloadIncompatible(long number) {
+ this.number = number;
+ }
+ }
+
+ // reproduce issue reported in
https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
+ @Test
+ public void
testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws
Exception {
+ String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+ admin.namespaces().createNamespace(namespace);
+ // don't enforce schema validation
+ admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ // set schema compatibility strategy to always compatible
+ admin.namespaces().setSchemaCompatibilityStrategy(namespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ Schema<Payload> schema = Schema.AVRO(Payload.class);
+ Schema<PayloadIncompatible> schemaIncompatible =
Schema.AVRO(PayloadIncompatible.class);
+ String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ +
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+ String dlqTopic = topic + "-DLQ";
+
+ // create topics
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createNonPartitionedTopic(dlqTopic);
+
+ AtomicInteger nackCounter = new AtomicInteger(0);
+ Consumer<Payload> payloadConsumer = null;
+ try {
+ payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
+
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
+
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
+ .messageListener((c, msg) -> {
+ if (nackCounter.incrementAndGet() < 10) {
+ c.negativeAcknowledge(msg);
+ }
+ }).subscribe();
+
+ // send a message to the topic with the incompatible schema
+ PayloadIncompatible payloadIncompatible = new
PayloadIncompatible(123);
+ try (Producer<PayloadIncompatible> producer =
pulsarClient.newProducer(schemaIncompatible).topic(topic)
+ .create()) {
+ producer.send(payloadIncompatible);
+ }
+
+ Thread.sleep(2000L);
+
+
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+ .describedAs("producer count of dlq topic %s should be <=
1 so that it doesn't leak producers",
+ dlqTopic)
+ .isLessThanOrEqualTo(1);
+
+ } finally {
+ if (payloadConsumer != null) {
+ try {
+ payloadConsumer.close();
+ } catch (PulsarClientException e) {
+ // ignore
+ }
+ }
+ }
+
+
assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
+ .describedAs("producer count of dlq topic %s should be 0 here",
+ dlqTopic)
+ .isEqualTo(0);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
index cd598585c8e..91b97fa4758 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java
@@ -18,12 +18,12 @@
*/
package org.apache.pulsar.client.api;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -36,11 +36,10 @@ import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.ConsumerImpl;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.reflections.ReflectionUtils;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
@@ -617,10 +616,12 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 30000L)
public void testRetryTopicException() throws Exception {
- final String topic = "persistent://my-property/my-ns/retry-topic";
+ String retryLetterTopic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 1;
// subscribe before publish
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
@@ -629,7 +630,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
-
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+ .retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -642,30 +643,16 @@ public class RetryTopicTest extends ProducerConsumerBase {
}
producer.close();
- // mock a retry producer exception when reconsumelater is called
- MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer =
(MultiTopicsConsumerImpl<byte[]>) consumer;
- List<ConsumerImpl<byte[]>> consumers =
multiTopicsConsumer.getConsumers();
- for (ConsumerImpl<byte[]> c : consumers) {
- Set<Field> deadLetterPolicyField =
- ReflectionUtils.getAllFields(c.getClass(),
ReflectionUtils.withName("deadLetterPolicy"));
-
- if (deadLetterPolicyField.size() != 0) {
- Field field = deadLetterPolicyField.iterator().next();
- field.setAccessible(true);
- DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy)
field.get(c);
-
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
- }
- }
+ admin.topics().terminateTopic(retryLetterTopic);
+
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(),
new String(message.getData()));
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
- } catch (PulsarClientException.InvalidTopicNameException e) {
- assertEquals(e.getClass(),
PulsarClientException.InvalidTopicNameException.class);
- } catch (Exception e) {
- fail("exception should be
PulsarClientException.InvalidTopicNameException");
+ fail("exception should be
PulsarClientException.TopicTerminatedException");
+ } catch (PulsarClientException.TopicTerminatedException e) {
+ // ok
}
- consumer.close();
}
@@ -718,10 +705,12 @@ public class RetryTopicTest extends ProducerConsumerBase {
@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
- final String topic = "persistent://my-property/my-ns/retry-topic";
+ String retryLetterTopic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
+ @Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
@@ -730,7 +719,7 @@ public class RetryTopicTest extends ProducerConsumerBase {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
-
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
+ .retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
@@ -739,24 +728,11 @@ public class RetryTopicTest extends ProducerConsumerBase {
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
- producer.newMessage().key("1").value(String.format("Hello Pulsar
[%d]", i).getBytes()).send();
+ producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();
- // mock a retry producer exception when reconsumelater is called
- MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer =
(MultiTopicsConsumerImpl<byte[]>) consumer;
- List<ConsumerImpl<byte[]>> consumers =
multiTopicsConsumer.getConsumers();
- for (ConsumerImpl<byte[]> c : consumers) {
- Set<Field> deadLetterPolicyField =
- ReflectionUtils.getAllFields(c.getClass(),
ReflectionUtils.withName("deadLetterPolicy"));
-
- if (deadLetterPolicyField.size() != 0) {
- Field field = deadLetterPolicyField.iterator().next();
- field.setAccessible(true);
- DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy)
field.get(c);
-
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
- }
- }
+ admin.topics().terminateTopic(retryLetterTopic);
List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < sendMessages; i++) {
@@ -769,16 +745,114 @@ public class RetryTopicTest extends ProducerConsumerBase
{
new Thread(() -> {
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
- } catch (Exception ignore) {
-
- } finally {
+ } catch (PulsarClientException.TopicTerminatedException e) {
+ // ok
latch.countDown();
+ } catch (PulsarClientException e) {
+ // unexpected exception
+ fail("unexpected exception", e);
}
}).start();
}
- latch.await();
+ latch.await(sendMessages, TimeUnit.SECONDS);
consumer.close();
}
+ @Data
+ static class Payload {
+ String number;
+
+ public Payload() {
+
+ }
+
+ public Payload(String number) {
+ this.number = number;
+ }
+ }
+
+ @Data
+ static class PayloadIncompatible {
+ long number;
+
+ public PayloadIncompatible() {
+
+ }
+
+ public PayloadIncompatible(long number) {
+ this.number = number;
+ }
+ }
+
+ // reproduce similar issue as reported in
https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
+ // but for retry topic
+ @Test
+ public void
testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws
Exception {
+ String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
+ admin.namespaces().createNamespace(namespace);
+ // don't enforce schema validation
+ admin.namespaces().setSchemaValidationEnforced(namespace, false);
+ // set schema compatibility strategy to always compatible
+ admin.namespaces().setSchemaCompatibilityStrategy(namespace,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ Schema<Payload> schema = Schema.AVRO(Payload.class);
+ Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
+ PayloadIncompatible.class);
+ String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ +
"/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
+ String dlqTopic = topic + "-DLQ";
+ String retryTopic = topic + "-RETRY";
+
+ // create topics
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topics().createNonPartitionedTopic(dlqTopic);
+ admin.topics().createNonPartitionedTopic(retryTopic);
+
+ Consumer<Payload> payloadConsumer = null;
+ try {
+ payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
+
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
+ .ackTimeout(1, TimeUnit.SECONDS)
+ .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
+ .enableRetry(true)
+
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
+ .deadLetterTopic(dlqTopic).build())
+ .messageListener((c, msg) -> {
+ try {
+ c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }).subscribe();
+
+ // send a message to the topic with the incompatible schema
+ PayloadIncompatible payloadIncompatible = new
PayloadIncompatible(123);
+ try (Producer<PayloadIncompatible> producer =
pulsarClient.newProducer(schemaIncompatible).topic(topic)
+ .create()) {
+ producer.send(payloadIncompatible);
+ }
+
+ Thread.sleep(2000L);
+
+
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+ .describedAs("producer count of retry topic %s should be
<= 1 so that it doesn't leak producers",
+ retryTopic)
+ .isLessThanOrEqualTo(1);
+
+ } finally {
+ if (payloadConsumer != null) {
+ try {
+ payloadConsumer.close();
+ } catch (PulsarClientException e) {
+ // ignore
+ }
+ }
+ }
+
+
assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
+ .describedAs("producer count of retry topic %s should be 0
here",
+ retryTopic)
+ .isEqualTo(0);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 86af4bdaf58..77a91a944ee 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
@@ -202,8 +203,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
private final DeadLetterPolicy deadLetterPolicy;
private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
-
+ private volatile int deadLetterProducerFailureCount;
private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer;
+ private volatile int retryLetterProducerFailureCount;
private final ReadWriteLock createProducerLock = new
ReentrantReadWriteLock();
protected volatile boolean paused;
@@ -682,9 +684,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return FutureUtil.failedFuture(exception);
}
- initRetryLetterProducerIfNeeded();
CompletableFuture<Void> result = new CompletableFuture<>();
- if (retryLetterProducer != null) {
+ if (initRetryLetterProducerIfNeeded() != null) {
try {
MessageImpl<T> retryMessage = (MessageImpl<T>)
getMessageImpl(message);
String originMessageIdStr = message.getMessageId().toString();
@@ -707,52 +708,61 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
MessageId finalMessageId = messageId;
if (reconsumeTimes >
this.deadLetterPolicy.getMaxRedeliverCount()
&&
StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
- initDeadLetterProducerIfNeeded();
- deadLetterProducer.thenAcceptAsync(dlqProducer -> {
- TypedMessageBuilder<byte[]> typedMessageBuilderNew =
-
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
- .value(retryMessage.getData())
- .properties(propertiesMap);
- copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
- typedMessageBuilderNew.sendAsync().thenAccept(msgId ->
{
- consumerDlqMessagesCounter.increment();
-
- doAcknowledge(finalMessageId, ackType,
Collections.emptyMap(), null).thenAccept(v -> {
- result.complete(null);
+
initDeadLetterProducerIfNeeded().thenAcceptAsync(dlqProducer -> {
+ try {
+ TypedMessageBuilder<byte[]> typedMessageBuilderNew
=
+ dlqProducer.newMessage(
+
Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
+ .value(retryMessage.getData())
+ .properties(propertiesMap);
+ copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
+
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
+ consumerDlqMessagesCounter.increment();
+
+ doAcknowledge(finalMessageId, ackType,
Collections.emptyMap(), null).thenAccept(v -> {
+ result.complete(null);
+ }).exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
- }).exceptionally(ex -> {
- result.completeExceptionally(ex);
- return null;
- });
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
}, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
- deadLetterProducer = null;
return null;
});
} else {
assert retryMessage != null;
- retryLetterProducer.thenAcceptAsync(rtlProducer -> {
- TypedMessageBuilder<byte[]> typedMessageBuilderNew =
rtlProducer
-
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
- .value(retryMessage.getData())
- .properties(propertiesMap);
- if (delayTime > 0) {
- typedMessageBuilderNew.deliverAfter(delayTime,
unit);
+
initRetryLetterProducerIfNeeded().thenAcceptAsync(rtlProducer -> {
+ try {
+ TypedMessageBuilder<byte[]> typedMessageBuilderNew
= rtlProducer
+
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .value(retryMessage.getData())
+ .properties(propertiesMap);
+ if (delayTime > 0) {
+ typedMessageBuilderNew.deliverAfter(delayTime,
unit);
+ }
+ copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
+ typedMessageBuilderNew.sendAsync()
+ .thenCompose(
+ __ ->
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
+ .thenAccept(v -> {
+ result.complete(null);
+ })
+ .exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
+ } catch (Exception e) {
+ result.completeExceptionally(e);
}
- copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
- typedMessageBuilderNew.sendAsync()
- .thenCompose(__ ->
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
- .thenAccept(v -> result.complete(null))
- .exceptionally(ex -> {
- result.completeExceptionally(ex);
- return null;
- });
}, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
- retryLetterProducer = null;
return null;
});
}
@@ -1099,10 +1109,29 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
public synchronized CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
+ closeFutures.add(closeFuture);
+ if (retryLetterProducer != null) {
+ closeFutures.add(retryLetterProducer.thenCompose(p ->
p.closeAsync()).whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("Exception ignored in closing retryLetterProducer
of consumer", ex);
+ }
+ }));
+ }
+ if (deadLetterProducer != null) {
+ closeFutures.add(deadLetterProducer.thenCompose(p ->
p.closeAsync()).whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("Exception ignored in closing deadLetterProducer
of consumer", ex);
+ }
+ }));
+ }
+ CompletableFuture<Void> compositeCloseFuture =
FutureUtil.waitForAll(closeFutures);
+
+
if (getState() == State.Closing || getState() == State.Closed) {
closeConsumerTasks();
failPendingReceive().whenComplete((r, t) ->
closeFuture.complete(null));
- return closeFuture;
+ return compositeCloseFuture;
}
consumersClosedCounter.increment();
@@ -1114,7 +1143,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
deregisterFromClientCnx();
client.cleanupConsumer(this);
failPendingReceive().whenComplete((r, t) ->
closeFuture.complete(null));
- return closeFuture;
+ return compositeCloseFuture;
}
stats.getStatTimeout().ifPresent(Timeout::cancel);
@@ -1141,23 +1170,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
});
}
- ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
- closeFutures.add(closeFuture);
- if (retryLetterProducer != null) {
- closeFutures.add(retryLetterProducer.thenCompose(p ->
p.closeAsync()).whenComplete((ignore, ex) -> {
- if (ex != null) {
- log.warn("Exception ignored in closing retryLetterProducer
of consumer", ex);
- }
- }));
- }
- if (deadLetterProducer != null) {
- closeFutures.add(deadLetterProducer.thenCompose(p ->
p.closeAsync()).whenComplete((ignore, ex) -> {
- if (ex != null) {
- log.warn("Exception ignored in closing deadLetterProducer
of consumer", ex);
- }
- }));
- }
- return FutureUtil.waitForAll(closeFutures);
+ return compositeCloseFuture;
}
private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable
exception) {
@@ -2216,47 +2229,54 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
CompletableFuture<Boolean> result = new CompletableFuture<>();
if (deadLetterMessages != null) {
- initDeadLetterProducerIfNeeded();
List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
- deadLetterProducer.thenAcceptAsync(producerDLQ -> {
+ initDeadLetterProducerIfNeeded().thenAcceptAsync(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
- String originMessageIdStr =
message.getMessageId().toString();
- String originTopicNameStr = getOriginTopicNameStr(message);
- TypedMessageBuilder<byte[]> typedMessageBuilderNew =
-
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
- .value(message.getData())
- .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
- copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
- typedMessageBuilderNew.sendAsync()
- .thenAccept(messageIdInDLQ -> {
-
possibleSendToDeadLetterTopicMessages.remove(messageId);
- acknowledgeAsync(messageId).whenComplete((v,
ex) -> {
- if (ex != null) {
- log.warn("[{}] [{}] [{}] Failed to
acknowledge the message {} of the original"
- + " topic but send to
the DLQ successfully.",
- topicName, subscription,
consumerName, messageId, ex);
- result.complete(false);
+ try {
+ String originMessageIdStr =
message.getMessageId().toString();
+ String originTopicNameStr =
getOriginTopicNameStr(message);
+ TypedMessageBuilder<byte[]> typedMessageBuilderNew =
+
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .value(message.getData())
+ .properties(getPropertiesMap(message,
originMessageIdStr, originTopicNameStr));
+ copyMessageKeysIfNeeded(message,
typedMessageBuilderNew);
+ typedMessageBuilderNew.sendAsync()
+ .thenAccept(messageIdInDLQ -> {
+
possibleSendToDeadLetterTopicMessages.remove(messageId);
+
acknowledgeAsync(messageId).whenComplete((v, ex) -> {
+ if (ex != null) {
+ log.warn(
+ "[{}] [{}] [{}] Failed to
acknowledge the message {} of the "
+ + "original topic
but send to the DLQ successfully.",
+ topicName, subscription,
consumerName, messageId, ex);
+ result.complete(false);
+ } else {
+ result.complete(true);
+ }
+ });
+ }).exceptionally(ex -> {
+ if (ex instanceof
PulsarClientException.ProducerQueueIsFullError) {
+ log.warn(
+ "[{}] [{}] [{}] Failed to send
DLQ message to {} for message id {}: {}",
+ topicName, subscription,
consumerName,
+
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
} else {
- result.complete(true);
+ log.warn("[{}] [{}] [{}] Failed to
send DLQ message to {} for message id {}",
+ topicName, subscription,
consumerName,
+
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
}
+ result.complete(false);
+ return null;
});
- }).exceptionally(ex -> {
- if (ex instanceof
PulsarClientException.ProducerQueueIsFullError) {
- log.warn("[{}] [{}] [{}] Failed to send
DLQ message to {} for message id {}: {}",
- topicName, subscription,
consumerName,
-
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
- } else {
- log.warn("[{}] [{}] [{}] Failed to send
DLQ message to {} for message id {}",
- topicName, subscription,
consumerName,
-
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
- }
- result.complete(false);
- return null;
- });
+ } catch (Exception e) {
+ log.warn("[{}] [{}] [{}] Failed to send DLQ message to
{} for message id {}",
+ topicName, subscription, consumerName,
deadLetterPolicy.getDeadLetterTopic(), messageId,
+ e);
+ result.complete(false);
+ }
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}",
deadLetterPolicy.getDeadLetterTopic(), ex);
- deadLetterProducer = null;
result.complete(false);
return null;
});
@@ -2266,51 +2286,112 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return result;
}
- private void initDeadLetterProducerIfNeeded() {
- if (deadLetterProducer == null) {
+ private CompletableFuture<Producer<byte[]>>
initDeadLetterProducerIfNeeded() {
+ CompletableFuture<Producer<byte[]>> p = deadLetterProducer;
+ if (p == null || p.isCompletedExceptionally()) {
createProducerLock.writeLock().lock();
try {
- if (deadLetterProducer == null) {
- deadLetterProducer =
- ((ProducerBuilderImpl<byte[]>)
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
-
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
-
.topic(this.deadLetterPolicy.getDeadLetterTopic())
-
.producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName,
this.subscription,
- this.consumerName,
RandomStringUtils.randomAlphanumeric(5)))
- .blockIfQueueFull(false)
- .enableBatching(false)
- .enableChunking(true)
- .createAsync();
- deadLetterProducer.thenAccept(dlqProducer -> {
-
stats.setDeadLetterProducerStats(dlqProducer.getStats());
- });
+ p = deadLetterProducer;
+ if (p == null || p.isCompletedExceptionally()) {
+ p = createProducerWithBackOff(() -> {
+ CompletableFuture<Producer<byte[]>> newProducer =
+ ((ProducerBuilderImpl<byte[]>)
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
+
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
+
.topic(this.deadLetterPolicy.getDeadLetterTopic())
+ .producerName(
+
String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription,
+ this.consumerName,
RandomStringUtils.randomAlphanumeric(5)))
+ .blockIfQueueFull(false)
+ .enableBatching(false)
+ .enableChunking(true)
+ .createAsync();
+ newProducer.whenComplete((producer, ex) -> {
+ if (ex != null) {
+ log.error("[{}] [{}] [{}] Failed to create
dead letter producer for topic {}",
+ topicName, subscription, consumerName,
deadLetterPolicy.getDeadLetterTopic(),
+ ex);
+ deadLetterProducerFailureCount++;
+ } else {
+ deadLetterProducerFailureCount = 0;
+
stats.setDeadLetterProducerStats(producer.getStats());
+ }
+ });
+ return newProducer;
+ }, deadLetterProducerFailureCount, () -> "dead letter
producer (topic: "
+ + deadLetterPolicy.getDeadLetterTopic() + ")");
+ deadLetterProducer = p;
}
} finally {
createProducerLock.writeLock().unlock();
}
}
+ return p;
}
- private void initRetryLetterProducerIfNeeded() {
- if (retryLetterProducer == null) {
+ private CompletableFuture<Producer<byte[]>> createProducerWithBackOff(
+ Supplier<CompletableFuture<Producer<byte[]>>> producerSupplier,
int failureCount,
+ Supplier<String> logDescription) {
+ if (failureCount == 0) {
+ return producerSupplier.get();
+ } else {
+ // calculate backoff time for given failure count
+ Backoff backoff = new BackoffBuilder()
+ .setInitialTime(100, TimeUnit.MILLISECONDS)
+
.setMandatoryStop(client.getConfiguration().getOperationTimeoutMs() * 2,
+ TimeUnit.MILLISECONDS)
+ .setMax(1, TimeUnit.MINUTES)
+ .create();
+ long backoffTimeMillis = 0;
+ for (int i = 0; i < failureCount; i++) {
+ backoffTimeMillis = backoff.next();
+ }
+ CompletableFuture<Producer<byte[]>> newProducer = new
CompletableFuture<>();
+ ScheduledExecutorService executor =
+ (ScheduledExecutorService)
client.getScheduledExecutorProvider().getExecutor(this);
+ log.info("Creating {} with backoff time of {} ms",
logDescription.get(), backoffTimeMillis);
+ executor.schedule(() -> {
+ FutureUtil.completeAfter(newProducer, producerSupplier.get());
+ }, backoffTimeMillis, TimeUnit.MILLISECONDS);
+ return newProducer;
+ }
+ }
+
+ private CompletableFuture<Producer<byte[]>>
initRetryLetterProducerIfNeeded() {
+ CompletableFuture<Producer<byte[]>> p = retryLetterProducer;
+ if (p == null || p.isCompletedExceptionally()) {
createProducerLock.writeLock().lock();
try {
- if (retryLetterProducer == null) {
- retryLetterProducer = client
- .newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
- .topic(this.deadLetterPolicy.getRetryLetterTopic())
- .enableBatching(false)
- .enableChunking(true)
- .blockIfQueueFull(false)
- .createAsync();
- retryLetterProducer.thenAccept(rtlProducer -> {
-
stats.setRetryLetterProducerStats(rtlProducer.getStats());
- });
+ p = retryLetterProducer;
+ if (p == null || p.isCompletedExceptionally()) {
+ p = createProducerWithBackOff(() -> {
+ CompletableFuture<Producer<byte[]>> newProducer =
client
+ .newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
+
.topic(this.deadLetterPolicy.getRetryLetterTopic())
+ .enableBatching(false)
+ .enableChunking(true)
+ .blockIfQueueFull(false)
+ .createAsync();
+ newProducer.whenComplete((producer, ex) -> {
+ if (ex != null) {
+ log.error("[{}] [{}] [{}] Failed to create
retry letter producer for topic {}",
+ topicName, subscription, consumerName,
deadLetterPolicy.getRetryLetterTopic(),
+ ex);
+ retryLetterProducerFailureCount++;
+ } else {
+ retryLetterProducerFailureCount = 0;
+
stats.setRetryLetterProducerStats(producer.getStats());
+ }
+ });
+ return newProducer;
+ }, retryLetterProducerFailureCount, () -> "retry letter
producer (topic: "
+ + deadLetterPolicy.getRetryLetterTopic() + ")");
+ retryLetterProducer = p;
}
} finally {
createProducerLock.writeLock().unlock();
}
}
+ return p;
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 6f9c5b47c55..341272cd69b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -638,7 +638,14 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futureList = consumers.values().stream()
- .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
+ .map(consumer -> consumer.closeAsync().exceptionally(t -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(t);
+ if (!(cause instanceof
PulsarClientException.AlreadyClosedException)) {
+ log.warn("[{}] [{}] Error closing individual consumer",
consumer.getTopic(),
+ consumer.getSubscription(), cause);
+ }
+ return null;
+ })).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.thenComposeAsync((r) -> {
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml
b/pulsar-client/src/main/resources/findbugsExclude.xml
index 0e05d20cb9b..f7cf6b9cfd5 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -1043,4 +1043,8 @@
<Method name="getStats"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.ConsumerImpl"/>
+ <Bug pattern="VO_VOLATILE_INCREMENT"/>
+ </Match>
</FindBugsFilter>