This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 0d2154e Fix: function with multi-topic not acking on effectively-once (#2347) 0d2154e is described below commit 0d2154ed3acd67bcb651644ae13e43cdd4045e90 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Mon Aug 13 14:19:46 2018 -0700 Fix: function with multi-topic not acking on effectively-once (#2347) ### Motivation `MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and therefore, function with multi-topic and `EFFECTIVELY_ONCE` processing is not acking message and failing `EFFECTIVELY_ONCE` behavior. ### Modifications Function should ack message for a specific topic consumer if `inputTopicConsumer` is multi-topic consumer. ### Result Function should able to ack messages for multi-topic consumer when processing-guarantee is `EFFECTIVELY_ONCE` --- .../client/api/PartitionedProducerConsumerTest.java | 9 ++------- .../java/org/apache/pulsar/io/PulsarSinkE2ETest.java | 10 ++++++---- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 17 +++++++++++------ .../apache/pulsar/functions/source/PulsarSource.java | 15 ++++++--------- 4 files changed, 25 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index a599532..ae7757d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -290,19 +290,14 @@ public class PartitionedProducerConsumerTest extends ProducerConsumerBase { } try { - producer = pulsarClient.newProducer().topic(topicName.toString()) - .enableBatching(false) - .messageRoutingMode(MessageRoutingMode.SinglePartition) - .create(); + producer = pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition).create(); consumer = pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe(); producer.send("message1".getBytes()); producer.send("message2".getBytes()); /* Message<byte[]> msg1 = */ consumer.receive(); Message<byte[]> msg2 = consumer.receive(); consumer.acknowledgeCumulative(msg2); - Assert.fail("should fail since ack cumulative is not supported for partitioned topic"); - } catch (PulsarClientException e) { - Assert.assertTrue(e instanceof PulsarClientException.NotSupportedException); } finally { producer.close(); consumer.unsubscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index abc4735..5db9a0a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction; import org.apache.pulsar.functions.instance.JavaInstanceRunnable; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.FunctionDetails; +import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees; import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus; @@ -356,11 +357,11 @@ public class PulsarSinkE2ETest { retryStrategically((test) -> { try { SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); - return subStats.unackedMessages == 0; + return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs; } catch (PulsarAdminException e) { return false; } - }, 5, 500); + }, 5, 200); FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager(); functionRuntimeManager.updateRates(); @@ -399,11 +400,12 @@ public class PulsarSinkE2ETest { functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA); functionDetailsBuilder.setParallelism(1); functionDetailsBuilder.setClassName(IdentityFunction.class.getName()); + functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE); // set source spec // source spec classname should be empty so that the default pulsar source will be used SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER); sourceSpecBuilder.setTypeClassName(typeArg.getName()); sourceSpecBuilder.setTopicsPattern(sourceTopicPattern); sourceSpecBuilder.setSubscriptionName(subscriptionName); @@ -484,7 +486,7 @@ public class PulsarSinkE2ETest { // set source spec // source spec classname should be empty so that the default pulsar source will be used SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder(); - sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED); + sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER); sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, DefaultSerDe.class.getName()); functionDetailsBuilder.setAutoAck(true); functionDetailsBuilder.setSource(sourceSpecBuilder); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index fc91eed..4a0c449 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -362,22 +362,27 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType, Map<String,Long> properties) { checkArgument(messageId instanceof TopicMessageIdImpl); - TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId; + TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId; if (getState() != State.Ready) { return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed")); } if (ackType == AckType.Cumulative) { - return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException( - "Cumulative acknowledge not supported for topics consumer")); + Consumer individualConsumer = consumers.get(topicMessageId.getTopicName()); + if (individualConsumer != null) { + MessageId innerId = topicMessageId.getInnerMessageId(); + return individualConsumer.acknowledgeCumulativeAsync(innerId); + } else { + return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException()); + } } else { - ConsumerImpl<T> consumer = consumers.get(messageId1.getTopicName()); + ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicName()); - MessageId innerId = messageId1.getInnerMessageId(); + MessageId innerId = topicMessageId.getInnerMessageId(); return consumer.doAcknowledge(innerId, ackType, properties) .thenRun(() -> - unAckedMessageTracker.remove(messageId1)); + unAckedMessageTracker.remove(topicMessageId)); } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index ec71450..1b3c177 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -20,25 +20,17 @@ package org.apache.pulsar.functions.source; import static org.apache.commons.lang3.StringUtils.isNotBlank; -import com.google.common.annotations.VisibleForTesting; - import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; @@ -49,6 +41,10 @@ import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; +import com.google.common.annotations.VisibleForTesting; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import net.jodah.typetools.TypeResolver; @Slf4j @@ -142,7 +138,8 @@ public class PulsarSource<T> implements Source<T> { .message(message) .topicName(topicName) .ackFunction(() -> { - if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { + if (pulsarSourceConfig + .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { inputConsumer.acknowledgeCumulativeAsync(message); } else { inputConsumer.acknowledgeAsync(message);