This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new bd3de15d8ca [fix][broker] Intercept REDELIVER_UNACKNOWLEDGED_MESSAGES command (#21265) bd3de15d8ca is described below commit bd3de15d8ca2bf3659d4133d072c3bc7cb9e6f5d Author: kecona <145982269+kec...@users.noreply.github.com> AuthorDate: Wed Oct 25 22:49:18 2023 +0800 [fix][broker] Intercept REDELIVER_UNACKNOWLEDGED_MESSAGES command (#21265) --- .../broker/intercept/BrokerInterceptorTest.java | 19 +++++++++++++++++++ .../broker/intercept/CounterBrokerInterceptor.java | 7 +++++++ .../apache/pulsar/common/protocol/PulsarDecoder.java | 1 + 3 files changed, 27 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java index 405d7caa1fa..d211de62963 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import okhttp3.Call; import okhttp3.Callback; @@ -306,4 +307,22 @@ public class BrokerInterceptorTest extends ProducerConsumerBase { } } + @Test + public void testInterceptNack() throws Exception { + BrokerInterceptor interceptor = pulsar.getBrokerInterceptor(); + Assert.assertTrue(interceptor instanceof CounterBrokerInterceptor); + + final String topic = "test-intercept-nack" + UUID.randomUUID(); + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + @Cleanup + Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .topic(topic) + .subscriptionName("test-sub").subscribe(); + producer.send("test intercept nack message"); + Message<String> message = consumer.receive(); + consumer.negativeAcknowledge(message); + Awaitility.await().until(() -> ((CounterBrokerInterceptor) interceptor).getHandleNackCount().get() == 1); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java index 9c327a0ea6e..34fa4932da9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java @@ -31,6 +31,7 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; import org.apache.http.HttpStatus; @@ -62,6 +63,8 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { private final AtomicInteger messageDispatchCount = new AtomicInteger(); private final AtomicInteger messageAckCount = new AtomicInteger(); private final AtomicInteger handleAckCount = new AtomicInteger(); + @Getter + private final AtomicInteger handleNackCount = new AtomicInteger(); private final AtomicInteger txnCount = new AtomicInteger(); private final AtomicInteger committedTxnCount = new AtomicInteger(); private final AtomicInteger abortedTxnCount = new AtomicInteger(); @@ -79,6 +82,7 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { txnCount.set(0); committedTxnCount.set(0); abortedTxnCount.set(0); + handleNackCount.set(0); } private final List<ResponseEvent> responseList = new CopyOnWriteArrayList<>(); @@ -209,6 +213,9 @@ public class CounterBrokerInterceptor implements BrokerInterceptor { if (command.getType().equals(BaseCommand.Type.ACK)) { handleAckCount.incrementAndGet(); } + if(command.getType().equals(BaseCommand.Type.REDELIVER_UNACKNOWLEDGED_MESSAGES)) { + handleNackCount.incrementAndGet(); + } count.incrementAndGet(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java index 496652fed0b..c1c1ebe355b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java @@ -291,6 +291,7 @@ public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter { case REDELIVER_UNACKNOWLEDGED_MESSAGES: checkArgument(cmd.hasRedeliverUnacknowledgedMessages()); + safeInterceptCommand(cmd); handleRedeliverUnacknowledged(cmd.getRedeliverUnacknowledgedMessages()); break;