This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 08d24455a5899145dde3d8438e2760b3959515b9 Author: lipenghui <peng...@apache.org> AuthorDate: Sat Oct 30 01:53:27 2021 +0800 Fix the batch message ack for WebSocket proxy. (#12530) * Fix the batch message ack for WebSocket proxy. The WebSocket proxy uses the consumer instance to process the message acknowledgement. But for the batch message acknowledgement, the batch message instance that deserialized from WebSocket proxy will with the same batch message acker, which will lead to the batch message can't been acked because of the consumer only send the ack request to the broker while the acker becomes empty. The fix is introduce a map to maintain an original which used for getting batch message acker. Added the test for covering the batch message ack. * Simplified the logic a bit * Using time-bound cleanup for ack-timeout cases Co-authored-by: Matteo Merli <mme...@apache.org> --- .../websocket/proxy/ProxyPublishConsumeTest.java | 43 ++++++++++++++++++++++ .../pulsar/client/impl/BatchMessageAcker.java | 2 +- .../apache/pulsar/websocket/ConsumerHandler.java | 31 +++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 941e410..3e3cf9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -34,6 +34,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -892,6 +893,48 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } } + @Test(timeOut = 20000) + public void ackBatchMessageTest() throws Exception { + final String subscription = "my-sub"; + final String topic = "my-property/my-ns/ack-batch-message" + UUID.randomUUID(); + final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; + final int messages = 10; + + WebSocketClient consumerClient = new WebSocketClient(); + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .create(); + + try { + consumerClient.start(); + ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest(); + Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest); + + assertTrue(consumerFuture.get().isOpen()); + assertEquals(consumeSocket.getReceivedMessagesCount(), 0); + + for (int i = 0; i < messages; i++) { + producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + + producer.flush(); + consumeSocket.sendPermits(messages); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages)); + + // The message should not be acked since we only acked 1 message of the batch message + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions() + .get(subscription).getMsgBacklog(), 0)); + + } finally { + stopWebSocketClient(consumerClient); + } + } + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java index 5f9e617..f99c54d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageAcker.java @@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl; import java.util.BitSet; -class BatchMessageAcker { +public class BatchMessageAcker { private BatchMessageAcker() { this.bitSet = new BitSet(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index d30c084..54a5a62 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -24,9 +24,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Enums; import com.google.common.base.Splitter; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.io.IOException; import java.util.Base64; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -45,6 +50,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; @@ -87,6 +93,12 @@ public class ConsumerHandler extends AbstractWebSocketHandler { private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter"); + // Make sure use the same BatchMessageIdImpl to acknowledge the batch message, otherwise the BatchMessageAcker + // of the BatchMessageIdImpl will not complete. + private Cache<String, MessageId> messageIdCache = CacheBuilder.newBuilder() + .expireAfterWrite(1, TimeUnit.HOURS) + .build(); + public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { super(service, request, response); @@ -156,6 +168,8 @@ public class ConsumerHandler extends AbstractWebSocketHandler { } final long msgSize = msg.getData().length; + messageIdCache.put(dm.messageId, msg.getMessageId()); + try { getSession().getRemote() .sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(dm), new WriteCallback() { @@ -288,7 +302,14 @@ public class ConsumerHandler extends AbstractWebSocketHandler { log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), subscription, msgId, getRemote().getInetSocketAddress().toString()); } - consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); + + MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId); + if (originalMsgId != null) { + consumer.acknowledgeAsync(originalMsgId).thenAccept(consumer -> numMsgsAcked.increment()); + } else { + consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); + } + checkResumeReceive(); } @@ -299,7 +320,13 @@ public class ConsumerHandler extends AbstractWebSocketHandler { log.debug("[{}/{}] Received negative ack request of message {} from {} ", consumer.getTopic(), subscription, msgId, getRemote().getInetSocketAddress().toString()); } - consumer.negativeAcknowledge(msgId); + + MessageId originalMsgId = messageIdCache.asMap().remove(command.messageId); + if (originalMsgId != null) { + consumer.negativeAcknowledge(originalMsgId); + } else { + consumer.negativeAcknowledge(msgId); + } checkResumeReceive(); }