This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bd3de15d8ca [fix][broker] Intercept REDELIVER_UNACKNOWLEDGED_MESSAGES 
command (#21265)
bd3de15d8ca is described below

commit bd3de15d8ca2bf3659d4133d072c3bc7cb9e6f5d
Author: kecona <145982269+kec...@users.noreply.github.com>
AuthorDate: Wed Oct 25 22:49:18 2023 +0800

    [fix][broker] Intercept REDELIVER_UNACKNOWLEDGED_MESSAGES command (#21265)
---
 .../broker/intercept/BrokerInterceptorTest.java       | 19 +++++++++++++++++++
 .../broker/intercept/CounterBrokerInterceptor.java    |  7 +++++++
 .../apache/pulsar/common/protocol/PulsarDecoder.java  |  1 +
 3 files changed, 27 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 405d7caa1fa..d211de62963 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import okhttp3.Call;
 import okhttp3.Callback;
@@ -306,4 +307,22 @@ public class BrokerInterceptorTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test
+    public void testInterceptNack() throws Exception {
+        BrokerInterceptor interceptor = pulsar.getBrokerInterceptor();
+        Assert.assertTrue(interceptor instanceof CounterBrokerInterceptor);
+
+        final String topic = "test-intercept-nack" + UUID.randomUUID();
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
+                .topic(topic)
+                .subscriptionName("test-sub").subscribe();
+        producer.send("test intercept nack message");
+        Message<String> message = consumer.receive();
+        consumer.negativeAcknowledge(message);
+        Awaitility.await().until(() -> ((CounterBrokerInterceptor) 
interceptor).getHandleNackCount().get() == 1);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index 9c327a0ea6e..34fa4932da9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -31,6 +31,7 @@ import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.http.HttpStatus;
@@ -62,6 +63,8 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
     private final AtomicInteger messageDispatchCount = new AtomicInteger();
     private final AtomicInteger messageAckCount = new AtomicInteger();
     private final AtomicInteger handleAckCount = new AtomicInteger();
+    @Getter
+    private final AtomicInteger handleNackCount = new AtomicInteger();
     private final AtomicInteger txnCount = new AtomicInteger();
     private final AtomicInteger committedTxnCount = new AtomicInteger();
     private final AtomicInteger abortedTxnCount = new AtomicInteger();
@@ -79,6 +82,7 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
         txnCount.set(0);
         committedTxnCount.set(0);
         abortedTxnCount.set(0);
+        handleNackCount.set(0);
     }
 
     private final List<ResponseEvent> responseList = new 
CopyOnWriteArrayList<>();
@@ -209,6 +213,9 @@ public class CounterBrokerInterceptor implements 
BrokerInterceptor {
         if (command.getType().equals(BaseCommand.Type.ACK)) {
             handleAckCount.incrementAndGet();
         }
+        
if(command.getType().equals(BaseCommand.Type.REDELIVER_UNACKNOWLEDGED_MESSAGES))
 {
+            handleNackCount.incrementAndGet();
+        }
         count.incrementAndGet();
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 496652fed0b..c1c1ebe355b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -291,6 +291,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
 
             case REDELIVER_UNACKNOWLEDGED_MESSAGES:
                 checkArgument(cmd.hasRedeliverUnacknowledgedMessages());
+                safeInterceptCommand(cmd);
                 
handleRedeliverUnacknowledged(cmd.getRedeliverUnacknowledgedMessages());
                 break;
 

Reply via email to