This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9184c34  Fix: Bug in function triggering (#3406)
9184c34 is described below

commit 9184c34b103e874502426d2b7d42779326b28b46
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Jan 24 15:40:02 2019 -0600

    Fix: Bug in function triggering (#3406)
    
    * fix function trigger
    
    * simplifying code
---
 .../java/org/apache/pulsar/functions/source/PulsarSource.java | 11 +----------
 .../pulsar/functions/worker/rest/api/ComponentImpl.java       |  9 ++++++---
 2 files changed, 7 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 869c706..baf8dcf 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -107,19 +107,10 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
 
     @Override
     public void received(Consumer<T> consumer, Message<T> message) {
-        String topicName;
-
-        // If more than one topics are being read than the Message return by 
the consumer will be TopicMessageImpl
-        // If there is only topic being read then the Message returned by the 
consumer wil be MessageImpl
-        if (message instanceof TopicMessageImpl) {
-            topicName = ((TopicMessageImpl<?>) message).getTopicName();
-        } else {
-            topicName = consumer.getTopic();
-        }
 
         Record<T> record = PulsarRecord.<T>builder()
                 .message(message)
-                .topicName(topicName)
+                .topicName(message.getTopicName())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig
                             .getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 50afdc6..71825a6 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -89,6 +90,7 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -1037,15 +1039,16 @@ public abstract class ComponentImpl {
                         && 
msg.getProperties().containsKey("__pfn_input_topic__")) {
                     MessageId newMsgId = MessageId.fromByteArray(
                             Base64.getDecoder().decode((String) 
msg.getProperties().get("__pfn_input_msg_id__")));
+
                     if (msgId.equals(newMsgId)
-                            && 
msg.getProperties().get("__pfn_input_topic__").equals(inputTopicToWrite)) {
+                            && 
msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get(inputTopicToWrite).toString()))
 {
                        return new String(msg.getData());
                     }
                 }
                 curTime = System.currentTimeMillis();
             }
-            throw new RestException(Status.REQUEST_TIMEOUT, "Requeste Timed 
Out");
-        } catch (Exception e) {
+            throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed 
Out");
+        } catch (IOException e) {
             throw new RestException(Status.INTERNAL_SERVER_ERROR, 
e.getMessage());
         } finally {
             if (reader != null) {

Reply via email to