This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 08d24455a5899145dde3d8438e2760b3959515b9
Author: lipenghui <peng...@apache.org>
AuthorDate: Sat Oct 30 01:53:27 2021 +0800

    Fix the batch message ack for WebSocket proxy. (#12530)
    
    * Fix the batch message ack for WebSocket proxy.
    
    The WebSocket proxy uses the consumer instance to process the message 
acknowledgement.
    But for the batch message acknowledgement, the batch message instance that 
deserialized from WebSocket proxy will with the same batch message acker, which 
will lead to the batch message can't been acked because of the consumer only 
send the ack request to the broker while the acker becomes empty.
    
    The fix is introduce a map to maintain an original which used for getting 
batch message acker.
    Added the test for covering the batch message ack.
    
    * Simplified the logic a bit
    
    * Using time-bound cleanup for ack-timeout cases
    
    Co-authored-by: Matteo Merli <mme...@apache.org>
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 43 ++++++++++++++++++++++
 .../pulsar/client/impl/BatchMessageAcker.java      |  2 +-
 .../apache/pulsar/websocket/ConsumerHandler.java   | 31 +++++++++++++++-
 3 files changed, 73 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 941e410..3e3cf9a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -34,6 +34,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -892,6 +893,48 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void ackBatchMessageTest() throws Exception {
+        final String subscription = "my-sub";
+        final String topic = "my-property/my-ns/ack-batch-message" + 
UUID.randomUUID();
+        final String consumerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+        final int messages = 10;
+
+        WebSocketClient consumerClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .batchingMaxPublishDelay(1, TimeUnit.SECONDS)
+                .create();
+
+        try {
+            consumerClient.start();
+            ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = 
consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);
+
+            assertTrue(consumerFuture.get().isOpen());
+            assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
+
+            for (int i = 0; i < messages; i++) {
+                
producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            }
+
+            producer.flush();
+            consumeSocket.sendPermits(messages);
+            Awaitility.await().untilAsserted(() ->
+                    
Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
+
+            // The message should not be acked since we only acked 1 message 
of the batch message
+            Awaitility.await().untilAsserted(() ->
+                    
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
+                            .get(subscription).getMsgBacklog(), 0));
+
+        } finally {
+            stopWebSocketClient(consumerClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
index 5f9e617..f99c54d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl;
 
 import java.util.BitSet;
 
-class BatchMessageAcker {
+public class BatchMessageAcker {
 
     private BatchMessageAcker() {
         this.bitSet = new BitSet();
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index d30c084..54a5a62 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -24,9 +24,14 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Enums;
 import com.google.common.base.Splitter;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.io.IOException;
 import java.util.Base64;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -45,6 +50,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.SubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -87,6 +93,12 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
     private static final AtomicLongFieldUpdater<ConsumerHandler> 
MSG_DELIVERED_COUNTER_UPDATER =
             AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, 
"msgDeliveredCounter");
 
+    // Make sure use the same BatchMessageIdImpl to acknowledge the batch 
message, otherwise the BatchMessageAcker
+    // of the BatchMessageIdImpl will not complete.
+    private Cache<String, MessageId> messageIdCache = CacheBuilder.newBuilder()
+            .expireAfterWrite(1, TimeUnit.HOURS)
+            .build();
+
     public ConsumerHandler(WebSocketService service, HttpServletRequest 
request, ServletUpgradeResponse response) {
         super(service, request, response);
 
@@ -156,6 +168,8 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
             }
             final long msgSize = msg.getData().length;
 
+            messageIdCache.put(dm.messageId, msg.getMessageId());
+
             try {
                 getSession().getRemote()
                         
.sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(dm), new 
WriteCallback() {
@@ -288,7 +302,14 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
             log.debug("[{}/{}] Received ack request of message {} from {} ", 
consumer.getTopic(),
                     subscription, msgId, 
getRemote().getInetSocketAddress().toString());
         }
-        consumer.acknowledgeAsync(msgId).thenAccept(consumer -> 
numMsgsAcked.increment());
+
+        MessageId originalMsgId = 
messageIdCache.asMap().remove(command.messageId);
+        if (originalMsgId != null) {
+            consumer.acknowledgeAsync(originalMsgId).thenAccept(consumer -> 
numMsgsAcked.increment());
+        } else {
+            consumer.acknowledgeAsync(msgId).thenAccept(consumer -> 
numMsgsAcked.increment());
+        }
+
         checkResumeReceive();
     }
 
@@ -299,7 +320,13 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
             log.debug("[{}/{}] Received negative ack request of message {} 
from {} ", consumer.getTopic(),
                     subscription, msgId, 
getRemote().getInetSocketAddress().toString());
         }
-        consumer.negativeAcknowledge(msgId);
+
+        MessageId originalMsgId = 
messageIdCache.asMap().remove(command.messageId);
+        if (originalMsgId != null) {
+            consumer.negativeAcknowledge(originalMsgId);
+        } else {
+            consumer.negativeAcknowledge(msgId);
+        }
         checkResumeReceive();
     }
 

Reply via email to