This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9184c34 Fix: Bug in function triggering (#3406) 9184c34 is described below commit 9184c34b103e874502426d2b7d42779326b28b46 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Jan 24 15:40:02 2019 -0600 Fix: Bug in function triggering (#3406) * fix function trigger * simplifying code --- .../java/org/apache/pulsar/functions/source/PulsarSource.java | 11 +---------- .../pulsar/functions/worker/rest/api/ComponentImpl.java | 9 ++++++--- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 869c706..baf8dcf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -107,19 +107,10 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> @Override public void received(Consumer<T> consumer, Message<T> message) { - String topicName; - - // If more than one topics are being read than the Message return by the consumer will be TopicMessageImpl - // If there is only topic being read then the Message returned by the consumer wil be MessageImpl - if (message instanceof TopicMessageImpl) { - topicName = ((TopicMessageImpl<?>) message).getTopicName(); - } else { - topicName = consumer.getTopic(); - } Record<T> record = PulsarRecord.<T>builder() .message(message) - .topicName(topicName) + .topicName(message.getTopicName()) .ackFunction(() -> { if (pulsarSourceConfig .getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java index 50afdc6..71825a6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java @@ -81,6 +81,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.functions.FunctionConfig; @@ -89,6 +90,7 @@ import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.io.SinkConfig; import org.apache.pulsar.common.io.SourceConfig; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.FunctionStats; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -1037,15 +1039,16 @@ public abstract class ComponentImpl { && msg.getProperties().containsKey("__pfn_input_topic__")) { MessageId newMsgId = MessageId.fromByteArray( Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__"))); + if (msgId.equals(newMsgId) - && msg.getProperties().get("__pfn_input_topic__").equals(inputTopicToWrite)) { + && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString())) { return new String(msg.getData()); } } curTime = System.currentTimeMillis(); } - throw new RestException(Status.REQUEST_TIMEOUT, "Requeste Timed Out"); - } catch (Exception e) { + throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out"); + } catch (IOException e) { throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); } finally { if (reader != null) {