This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ec8a5e  [schema] enable Schema.AUTO if functions or connectors are 
using GenericRecord (#2537)
9ec8a5e is described below

commit 9ec8a5ee4d4a80c50986cde95093b1eb4af31ef0
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Sat Sep 8 12:51:12 2018 -0700

    [schema] enable Schema.AUTO if functions or connectors are using 
GenericRecord (#2537)
    
    * [schema] enable Schema.AUTO if functions or connectors are using 
GenericRecord
    
    * add schema-api to functions-api
---
 pulsar-functions/api-java/pom.xml                  |  6 ++
 .../pulsar/functions/source/PulsarSource.java      |  2 +-
 .../pulsar/functions/source/TopicSchema.java       | 44 +++++--------
 pulsar-functions/java-examples/pom.xml             |  1 +
 .../functions/api/examples/AutoSchemaFunction.java | 33 ++++++++++
 tests/integration/pom.xml                          | 12 ++++
 .../integration/functions/PulsarFunctionsTest.java | 77 +++++++++++++++++++++-
 7 files changed, 143 insertions(+), 32 deletions(-)

diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index 9a84206..0f21bae 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -38,6 +38,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-schema</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>net.jodah</groupId>
       <artifactId>typetools</artifactId>
       <scope>test</scope>
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index e1059f3..6eed8e0 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -78,7 +78,7 @@ public class PulsarSource<T> extends PushSource<T> implements 
MessageListener<T>
         inputConsumers = configs.entrySet().stream().map(e -> {
             String topic = e.getKey();
             ConsumerConfig<T> conf = e.getValue();
-            log.info("Creating consumers for topic : {}",  topic);
+            log.info("Creating consumers for topic : {}, schema : {}",  topic, 
conf.getSchema());
             ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
                     // consume message even if can't decrypt and deliver it 
along with encryption-ctx
                     .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 2ac5b65..76375dc 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -68,24 +69,31 @@ public class TopicSchema {
     }
 
     public Schema<?> getSchema(String topic, Class<?> clazz, SchemaType 
schemaType) {
-        return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, 
schemaType));
+        return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(clazz, schemaType));
     }
 
     /**
      * If the topic is already created, we should be able to fetch the schema 
type (avro, json, ...)
      */
     private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
-        Optional<SchemaInfo> schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
-        if (schema.isPresent()) {
-            return schema.get().getType();
+        if (GenericRecord.class.isAssignableFrom(clazz)) {
+            return SchemaType.AUTO;
         } else {
-            return getDefaultSchemaType(clazz);
+            Optional<SchemaInfo> schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
+            if (schema.isPresent()) {
+                return schema.get().getType();
+            } else {
+                return getDefaultSchemaType(clazz);
+            }
         }
     }
 
     private static SchemaType getDefaultSchemaType(Class<?> clazz) {
         if (byte[].class.equals(clazz)) {
             return SchemaType.NONE;
+        } else if (GenericRecord.class.isAssignableFrom(clazz)) {
+            // the function is taking generic record, so we do auto schema 
detection
+            return SchemaType.AUTO;
         } else if (String.class.equals(clazz)) {
             // If type is String, then we use schema type string, otherwise we 
fallback on default schema
             return SchemaType.STRING;
@@ -102,6 +110,9 @@ public class TopicSchema {
         case NONE:
             return (Schema<T>) Schema.BYTES;
 
+        case AUTO:
+            return (Schema<T>) Schema.AUTO();
+
         case STRING:
             return (Schema<T>) Schema.STRING;
 
@@ -165,27 +176,4 @@ public class TopicSchema {
             return new SerDeSchema<>(serDe);
         }
     }
-
-    @SuppressWarnings("unchecked")
-    private static <T> Schema<T> extractSchema(Class<T> clazz, SchemaType 
type) {
-        switch (type) {
-        case NONE:
-            return (Schema<T>) Schema.BYTES;
-
-        case STRING:
-            return (Schema<T>) Schema.STRING;
-
-        case AVRO:
-            return AvroSchema.of(clazz);
-
-        case JSON:
-            return JSONSchema.of(clazz);
-
-        case PROTOBUF:
-            return ProtobufSchema.ofGenericClass(clazz, 
Collections.emptyMap());
-
-        default:
-            throw new RuntimeException("Unsupported schema type" + type);
-        }
-    }
 }
diff --git a/pulsar-functions/java-examples/pom.xml 
b/pulsar-functions/java-examples/pom.xml
index 747acff..b077dea 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -41,6 +41,7 @@
       <artifactId>pulsar-functions-api</artifactId>
       <version>${project.version}</version>
     </dependency>
+
   </dependencies>
 
 </project>
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java
new file mode 100644
index 0000000..03abc93
--- /dev/null
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Function that deals with Schema.Auto.
+ */
+public class AutoSchemaFunction implements Function<GenericRecord, String> {
+    @Override
+    public String process(GenericRecord input, Context context) {
+        return "value-" + input.getField("value");
+    }
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 6d3fdc4..7a89159 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -46,6 +46,18 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-api-examples</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-client-schema</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
       <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 17634a1..0bf9ded 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -38,6 +38,8 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
+import org.apache.pulsar.functions.api.examples.serde.CustomObject;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -45,7 +47,6 @@ import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runt
 import org.apache.pulsar.tests.integration.io.*;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -594,8 +595,23 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
                                                   String inputTopicName,
                                                   String outputTopicName,
                                                   String functionName) throws 
Exception {
+        submitFunction(
+            runtime,
+            inputTopicName,
+            outputTopicName,
+            functionName,
+            getExclamationClass(runtime),
+            Schema.STRING);
+    }
+
+    private static <T> void submitFunction(Runtime runtime,
+                                           String inputTopicName,
+                                           String outputTopicName,
+                                           String functionName,
+                                           String functionClass,
+                                           Schema<T> inputTopicSchema) throws 
Exception {
         CommandGenerator generator;
-        generator = CommandGenerator.createDefaultGenerator(inputTopicName, 
getExclamationClass(runtime));
+        generator = CommandGenerator.createDefaultGenerator(inputTopicName, 
functionClass);
         generator.setSinkTopic(outputTopicName);
         generator.setFunctionName(functionName);
         String command;
@@ -619,7 +635,7 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         try (PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build()) {
-            try (Consumer<String> ignored = client.newConsumer(Schema.STRING)
+            try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
                 .topic(inputTopicName)
                 .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName(String.format("public/default/%s", 
functionName))
@@ -707,4 +723,59 @@ public abstract class PulsarFunctionsTest extends 
PulsarFunctionsTestBase {
         assertTrue(result.getStderr().isEmpty());
     }
 
+    @Test
+    public void testAutoSchemaFunction() throws Exception {
+        String inputTopicName = "test-autoschema-input-" + randomName(8);
+        String outputTopicName = "test-autoshcema-output-" + randomName(8);
+        String functionName = "test-autoschema-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+        submitFunction(
+            Runtime.JAVA, inputTopicName, outputTopicName, functionName,
+            AutoSchemaFunction.class.getName(),
+            Schema.AVRO(CustomObject.class));
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // publish and consume result
+        publishAndConsumeAvroMessages(inputTopicName, outputTopicName, 
numMessages);
+
+        // get function status
+        getFunctionStatus(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+    }
+
+    private static void publishAndConsumeAvroMessages(String inputTopic,
+                                                      String outputTopic,
+                                                      int numMessages) throws 
Exception {
+        @Cleanup PulsarClient client = PulsarClient.builder()
+            .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+            .build();
+        @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+            .topic(outputTopic)
+            .subscriptionType(SubscriptionType.Exclusive)
+            .subscriptionName("test-sub")
+            .subscribe();
+        @Cleanup Producer<CustomObject> producer = 
client.newProducer(Schema.AVRO(CustomObject.class))
+            .topic(inputTopic)
+            .create();
+
+        for (int i = 0; i < numMessages; i++) {
+            CustomObject co = new CustomObject(i);
+            producer.send(co);
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive();
+            assertEquals("value-" + i, msg.getValue());
+        }
+    }
+
 }

Reply via email to