This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.10.5.3-a41ecf in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d89f7c0b03744c6401c771e58cc601ffa55113ee Author: Cong Zhao <[email protected]> AuthorDate: Wed Aug 14 10:26:47 2024 +0800 [fix][client] Fix for early hit `beforeConsume` for MultiTopicConsumer (#23141) (cherry picked from commit c07b158f003c5a5623296189f0932d7058d2e75a) --- .../apache/pulsar/client/api/InterceptorsTest.java | 45 +++++++++++------ .../client/impl/MultiTopicsConsumerImpl.java | 58 ++++++++++++++++++++-- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index b3f5ed3b487..8f239aea1f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -27,8 +29,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; @@ -66,6 +66,12 @@ public class InterceptorsTest extends ProducerConsumerBase { return new Object[][] { { 0 }, { 1000 } }; } + @DataProvider(name = "topics") + public Object[][] getTopics() { + return new Object[][] {{Lists.newArrayList("persistent://my-property/my-ns/my-topic") }, + { Lists.newArrayList("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1") }}; + } + @Test public void testProducerInterceptor() throws Exception { Map<MessageId, List<String>> ackCallback = new HashMap<>(); @@ -390,9 +396,9 @@ public class InterceptorsTest extends ProducerConsumerBase { @Override public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) { - MessageImpl<String> msg = (MessageImpl<String>) message; + MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -436,13 +442,19 @@ public class InterceptorsTest extends ProducerConsumerBase { int keyCount = 0; for (int i = 0; i < 2; i++) { - Message<String> received = consumer.receive(); + Message<String> received; + if (i % 2 == 0) { + received = consumer.receive(); + } else { + received = consumer.receiveAsync().join(); + } MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage(); for (KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) { if ("beforeConsumer".equals(keyValue.getKey())) { keyCount++; } } + Assert.assertEquals(keyCount, i + 1); consumer.acknowledge(received); } Assert.assertEquals(2, keyCount); @@ -462,9 +474,9 @@ public class InterceptorsTest extends ProducerConsumerBase { @Override public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) { - MessageImpl<String> msg = (MessageImpl<String>) message; + MessageImpl<String> msg = ((MessageImpl<String>) ((TopicMessageImpl<String>) message).getMessage()); msg.getMessageBuilder().addProperty().setKey("beforeConsumer").setValue("1"); - return msg; + return message; } @Override @@ -599,8 +611,8 @@ public class InterceptorsTest extends ProducerConsumerBase { consumer.close(); } - @Test - public void testConsumerInterceptorForNegativeAcksSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForNegativeAcksSend(List<String> topics) throws PulsarClientException, InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -627,6 +639,7 @@ public class InterceptorsTest extends ProducerConsumerBase { @Override public void onNegativeAcksSend(Consumer<String> consumer, Set<MessageId> messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } @@ -637,7 +650,7 @@ public class InterceptorsTest extends ProducerConsumerBase { }; Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionType(SubscriptionType.Failover) .intercept(interceptor) .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) @@ -645,7 +658,7 @@ public class InterceptorsTest extends ProducerConsumerBase { .subscribe(); Producer<String> producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); for (int i = 0; i < totalNumOfMessages; i++) { @@ -669,8 +682,9 @@ public class InterceptorsTest extends ProducerConsumerBase { consumer.close(); } - @Test - public void testConsumerInterceptorForAckTimeoutSend() throws PulsarClientException, InterruptedException { + @Test(dataProvider = "topics") + public void testConsumerInterceptorForAckTimeoutSend(List<String> topics) throws PulsarClientException, + InterruptedException { final int totalNumOfMessages = 100; CountDownLatch latch = new CountDownLatch(totalNumOfMessages / 2); @@ -701,16 +715,17 @@ public class InterceptorsTest extends ProducerConsumerBase { @Override public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageIds) { + Assert.assertTrue(latch.getCount() > 0); messageIds.forEach(messageId -> latch.countDown()); } }; Producer<String> producer = pulsarClient.newProducer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topic(topics.get(0)) .create(); Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING) - .topic("persistent://my-property/my-ns/my-topic") + .topics(topics) .subscriptionName("foo") .intercept(interceptor) .ackTimeout(2, TimeUnit.SECONDS) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index b01c25d215b..be618744180 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Lists; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -104,6 +105,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { private volatile BatchMessageIdImpl startMessageId = null; private final long startMessageRollbackDurationInSec; + private final ConsumerInterceptors<T> internalConsumerInterceptors; MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorProvider executorProvider, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) { @@ -133,6 +135,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { long startMessageRollbackDurationInSec) { super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), executorProvider, subscribeFuture, schema, interceptors); + if (interceptors != null) { + this.internalConsumerInterceptors = getInternalConsumerInterceptors(interceptors); + } else { + this.internalConsumerInterceptors = null; + } checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Topics Consumer"); @@ -315,7 +322,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { CompletableFuture<Message<T>> receivedFuture = nextPendingReceive(); if (receivedFuture != null) { unAckedMessageTracker.add(topicMessage.getMessageId(), topicMessage.getRedeliveryCount()); - completePendingReceive(receivedFuture, topicMessage); + final Message<T> interceptMessage = beforeConsume(topicMessage); + completePendingReceive(receivedFuture, interceptMessage); } else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } @@ -366,7 +374,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - return message; + return beforeConsume(message); } catch (Exception e) { throw PulsarClientException.unwrap(e); } @@ -393,6 +401,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { } } unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); + message = beforeConsume(message); } resumeReceivingFromPausedConsumersIfNeeded(); return message; @@ -463,7 +472,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId(), message.getRedeliveryCount()); resumeReceivingFromPausedConsumersIfNeeded(); - result.complete(message); + result.complete(beforeConsume(message)); } }); return result; @@ -1119,7 +1128,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { return ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), partitionIndex, true, listener != null, subFuture, - startMessageId, schema, interceptors, + startMessageId, schema, this.internalConsumerInterceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); } @@ -1503,4 +1512,45 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { acknowledgeCumulativeAsync(msg); } } + + private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) { + return new ConsumerInterceptors<T>(new ArrayList<>()) { + + @Override + public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) { + return message; + } + + @Override + public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledge(consumer, messageId, exception); + } + + @Override + public void onAcknowledgeCumulative(Consumer<T> consumer, + MessageId messageId, Throwable exception) { + multiTopicInterceptors.onAcknowledgeCumulative(consumer, messageId, exception); + } + + @Override + public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> set) { + multiTopicInterceptors.onNegativeAcksSend(consumer, set); + } + + @Override + public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> set) { + multiTopicInterceptors.onAckTimeoutSend(consumer, set); + } + + @Override + public void onPartitionsChange(String topicName, int partitions) { + multiTopicInterceptors.onPartitionsChange(topicName, partitions); + } + + @Override + public void close() throws IOException { + multiTopicInterceptors.close(); + } + }; + } }
