This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4aea8c  Fix validator logic to differentiate between serializer and 
deserializer (#2523)
e4aea8c is described below

commit e4aea8cd04f8052ff91ba50294b25d01d8943d99
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Sep 6 11:15:49 2018 -0700

    Fix validator logic to differentiate between serializer and deserializer 
(#2523)
    
    * Fix validator logic to differentiate between serializer and deserializer
    
    * Expand to include backend and schema
    
    * Fix buil
    
    * Fix unittest
    
    * Fixed unittest
---
 .../pulsar/functions/instance/ContextImpl.java     |  4 +-
 .../pulsar/functions/instance/InstanceUtils.java   | 37 +++++++++----
 .../apache/pulsar/functions/sink/PulsarSink.java   |  4 +-
 .../pulsar/functions/source/PulsarSource.java      |  4 +-
 .../pulsar/functions/source/TopicSchema.java       | 18 +++----
 .../pulsar/functions/sink/PulsarSinkTest.java      |  2 +-
 .../pulsar/functions/source/PulsarSourceTest.java  |  2 +-
 .../functions/utils/validation/ValidatorImpls.java | 62 ++++++++++++++--------
 8 files changed, 80 insertions(+), 53 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aec52fc..c4099f4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -254,13 +254,13 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object) {
-        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object));
+        return publish(topicName, object, "");
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, 
String schemaOrSerdeClassName) {
-        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
+        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index b4a9bf9..86a9aa2 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,27 +31,42 @@ import net.jodah.typetools.TypeResolver;
 
 @UtilityClass
 public class InstanceUtils {
-    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader 
clsLoader, Class<?> typeArg) {
+    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader 
clsLoader, Class<?> typeArg,
+                                           boolean deser) {
         SerDe<?> serDe = createInstance(serdeClassName, clsLoader, 
SerDe.class);
 
         Class<?>[] inputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
-                "Inconsistent types found between function input type and 
input serde type: "
-                        + " function type = " + typeArg + " should be 
assignable from "
-                        + inputSerdeTypeArgs[0]);
+        if (deser) {
+            checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                    "Inconsistent types found between function input type and 
serde type: "
+                            + " function type = " + typeArg + " should be 
assignable from "
+                            + inputSerdeTypeArgs[0]);
+        } else {
+            checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+                    "Inconsistent types found between function input type and 
serde type: "
+                            + " serde type = " + inputSerdeTypeArgs[0] + " 
should be assignable from "
+                            + typeArg);
+        }
 
         return serDe;
     }
 
-    public static Schema<?> initializeCustomSchema(String schemaClassName, 
ClassLoader clsLoader, Class<?> typeArg) {
+    public static Schema<?> initializeCustomSchema(String schemaClassName, 
ClassLoader clsLoader, Class<?> typeArg,
+                                                   boolean input) {
         Schema<?> schema = createInstance(schemaClassName, clsLoader, 
Schema.class);
 
         Class<?>[] inputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
-        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
-                "Inconsistent types found between function input type and 
input schema type: "
-                        + " function type = " + typeArg + " should be 
assignable from "
-                        + inputSerdeTypeArgs[0]);
-
+        if (input) {
+            checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                    "Inconsistent types found between function type and schema 
type: "
+                            + " function type = " + typeArg + " should be 
assignable from "
+                            + inputSerdeTypeArgs[0]);
+        } else {
+            checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+                    "Inconsistent types found between function type and schema 
type: "
+                            + " schema type = " + inputSerdeTypeArgs[0] + " 
should be assignable from "
+                            + typeArg);
+        }
         return schema;
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 5ec725c..835e288 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -277,10 +277,10 @@ public class PulsarSink<T> implements Sink<T> {
 
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
             return (Schema<T>) 
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
-                    pulsarSinkConfig.getSchemaType());
+                    pulsarSinkConfig.getSchemaType(), false);
         } else {
             return (Schema<T>) 
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
-                    pulsarSinkConfig.getSerdeClassName());
+                    pulsarSinkConfig.getSerdeClassName(), false);
         }
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 244ab70..e1059f3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -162,9 +162,9 @@ public class PulsarSource<T> extends PushSource<T> 
implements MessageListener<T>
         pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
             Schema<T> schema;
             if (conf.getSerdeClassName() != null && 
!conf.getSerdeClassName().isEmpty()) {
-                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSerdeClassName());
+                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSerdeClassName(), true);
             } else {
-                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSchemaType());
+                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSchemaType(), true);
             }
             configs.put(topic,
                     ConsumerConfig.<T> 
builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 1802ee5..2ac5b65 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -51,16 +51,12 @@ public class TopicSchema {
 
     public static final String DEFAULT_SERDE = 
"org.apache.pulsar.functions.api.utils.DefaultSerDe";
 
-    public Schema<?> getSchema(String topic, Object object) {
-        return getSchema(topic, object.getClass(), "");
+    public Schema<?> getSchema(String topic, Object object, String 
schemaTypeOrClassName, boolean input) {
+        return getSchema(topic, object.getClass(), schemaTypeOrClassName, 
input);
     }
 
-    public Schema<?> getSchema(String topic, Object object, String 
schemaTypeOrClassName) {
-        return getSchema(topic, object.getClass(), schemaTypeOrClassName);
-    }
-
-    public Schema<?> getSchema(String topic, Class<?> clazz, String 
schemaTypeOrClassName) {
-        return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(topic, clazz, schemaTypeOrClassName));
+    public Schema<?> getSchema(String topic, Class<?> clazz, String 
schemaTypeOrClassName, boolean input) {
+        return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(topic, clazz, schemaTypeOrClassName, input));
     }
 
     public Schema<?> getSchema(String topic, Class<?> clazz, 
Optional<SchemaType> schemaType) {
@@ -134,7 +130,7 @@ public class TopicSchema {
     }
 
     @SuppressWarnings("unchecked")
-    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, 
String schemaTypeOrClassName) {
+    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, 
String schemaTypeOrClassName, boolean input) {
         // The schemaTypeOrClassName can represent multiple thing, either a 
schema type, a schema class name or a ser-de
         // class name.
 
@@ -161,11 +157,11 @@ public class TopicSchema {
         // First try with Schema
         try {
             return (Schema<T>) 
InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
-                    Thread.currentThread().getContextClassLoader(), clazz);
+                    Thread.currentThread().getContextClassLoader(), clazz, 
input);
         } catch (Throwable t) {
             // Now try with Serde or just fail
             SerDe<T> serDe = (SerDe<T>) 
InstanceUtils.initializeSerDe(schemaTypeOrClassName,
-                    Thread.currentThread().getContextClassLoader(), clazz);
+                    Thread.currentThread().getContextClassLoader(), clazz, 
input);
             return new SerDeSchema<>(serDe);
         }
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 766dee3..72e3c56 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -140,7 +140,7 @@ public class PulsarSinkTest {
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 60f684f..e4825f2 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -159,7 +159,7 @@ public class PulsarSourceTest {
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index f264d2a..f60f3c0 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -381,7 +381,7 @@ public class ValidatorImpls {
             // implements SerDe class
             if (functionConfig.getCustomSerdeInputs() != null) {
                 functionConfig.getCustomSerdeInputs().forEach((topicName, 
inputSerializer) -> {
-                    validateSerde(inputSerializer, typeArgs[0], name, 
clsLoader);
+                    validateSerde(inputSerializer, typeArgs[0], name, 
clsLoader, true);
                 });
             }
 
@@ -389,7 +389,7 @@ public class ValidatorImpls {
             // implements SerDe class
             if (functionConfig.getCustomSchemaInputs() != null) {
                 functionConfig.getCustomSchemaInputs().forEach((topicName, 
schemaType) -> {
-                    validateSchema(schemaType, typeArgs[0], name, clsLoader);
+                    validateSchema(schemaType, typeArgs[0], name, clsLoader, 
true);
                 });
             }
 
@@ -405,10 +405,10 @@ public class ValidatorImpls {
                                 String.format("Only one of schemaType or 
serdeClassName should be set in inputSpec"));
                     }
                     if (conf.getSerdeClassName() != null && 
!conf.getSerdeClassName().isEmpty()) {
-                        validateSerde(conf.getSerdeClassName(), typeArgs[0], 
name, clsLoader);
+                        validateSerde(conf.getSerdeClassName(), typeArgs[0], 
name, clsLoader, true);
                     }
                     if (conf.getSchemaType() != null && 
!conf.getSchemaType().isEmpty()) {
-                        validateSchema(conf.getSchemaType(), typeArgs[0], 
name, clsLoader);
+                        validateSchema(conf.getSchemaType(), typeArgs[0], 
name, clsLoader, true);
                     }
                 });
             }
@@ -425,16 +425,17 @@ public class ValidatorImpls {
             }
 
             if (functionConfig.getOutputSchemaType() != null && 
!functionConfig.getOutputSchemaType().isEmpty()) {
-                validateSchema(functionConfig.getOutputSchemaType(), 
typeArgs[1], name, clsLoader);
+                validateSchema(functionConfig.getOutputSchemaType(), 
typeArgs[1], name, clsLoader, false);
             }
 
             if (functionConfig.getOutputSerdeClassName() != null && 
!functionConfig.getOutputSerdeClassName().isEmpty()) {
-                validateSerde(functionConfig.getOutputSerdeClassName(), 
typeArgs[1], name, clsLoader);
+                validateSerde(functionConfig.getOutputSerdeClassName(), 
typeArgs[1], name, clsLoader, false);
             }
 
         }
 
-        private static void validateSchema(String schemaType, Class<?> 
typeArg, String name, ClassLoader clsLoader) {
+        private static void validateSchema(String schemaType, Class<?> 
typeArg, String name, ClassLoader clsLoader,
+                                           boolean input) {
             if (StringUtils.isEmpty(schemaType) || 
getBuiltinSchemaType(schemaType) != null) {
                 // If it's empty, we use the default schema and no need to 
validate
                 // If it's built-in, no need to validate
@@ -447,11 +448,12 @@ public class ValidatorImpls {
                                     schemaType, 
Schema.class.getCanonicalName()));
                 }
 
-                validateSchemaType(schemaType, typeArg, clsLoader);
+                validateSchemaType(schemaType, typeArg, clsLoader, input);
             }
         }
 
-        private static void validateSerde(String inputSerializer, Class<?> 
typeArg, String name, ClassLoader clsLoader) {
+        private static void validateSerde(String inputSerializer, Class<?> 
typeArg, String name, ClassLoader clsLoader,
+                                          boolean deser) {
             if (StringUtils.isEmpty(inputSerializer)) return;
             Class<?> serdeClass;
             try {
@@ -492,8 +494,14 @@ public class ValidatorImpls {
                     throw new IllegalArgumentException("Failed to load type 
class", e);
                 }
 
-                if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                    throw new IllegalArgumentException("Serializer type 
mismatch " + typeArg + " vs " + serDeTypes[0]);
+                if (deser) {
+                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                        throw new IllegalArgumentException("Serializer type 
mismatch " + typeArg + " vs " + serDeTypes[0]);
+                    }
+                } else {
+                    if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+                        throw new IllegalArgumentException("Serializer type 
mismatch " + typeArg + " vs " + serDeTypes[0]);
+                    }
                 }
             }
         }
@@ -734,10 +742,10 @@ public class ValidatorImpls {
                 }
 
                 if (sourceConfig.getSerdeClassName() != null && 
!sourceConfig.getSerdeClassName().isEmpty()) {
-                    
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, 
name, clsLoader);
+                    
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, 
name, clsLoader, false);
                 }
                 if (sourceConfig.getSchemaType() != null && 
!sourceConfig.getSchemaType().isEmpty()) {
-                    
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, 
name, clsLoader);
+                    
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, 
name, clsLoader, false);
                 }
             } catch (IOException e) {
                 throw new IllegalArgumentException(e);
@@ -774,13 +782,13 @@ public class ValidatorImpls {
 
                 if (sinkConfig.getTopicToSerdeClassName() != null) {
                     sinkConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
-                        FunctionConfigValidator.validateSerde(serdeClassName, 
typeArg, name, clsLoader);
+                        FunctionConfigValidator.validateSerde(serdeClassName, 
typeArg, name, clsLoader, true);
                     });
                 }
 
                 if (sinkConfig.getTopicToSchemaType() != null) {
                     sinkConfig.getTopicToSchemaType().forEach((topicName, 
schemaType) -> {
-                        FunctionConfigValidator.validateSchema(schemaType, 
typeArg, name, clsLoader);
+                        FunctionConfigValidator.validateSchema(schemaType, 
typeArg, name, clsLoader, true);
                     });
                 }
 
@@ -794,10 +802,10 @@ public class ValidatorImpls {
                             throw new IllegalArgumentException("Only one of 
serdeClassName or schemaType should be set");
                         }
                         if (consumerSpec.getSerdeClassName() != null && 
!consumerSpec.getSerdeClassName().isEmpty()) {
-                            
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), 
typeArg, name, clsLoader);
+                            
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), 
typeArg, name, clsLoader, true);
                         }
                         if (consumerSpec.getSchemaType() != null && 
!consumerSpec.getSchemaType().isEmpty()) {
-                            
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, 
name, clsLoader);
+                            
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, 
name, clsLoader, true);
                         }
                     });
                 }
@@ -900,8 +908,8 @@ public class ValidatorImpls {
         }
     }
 
-    private static void validateSchemaType(String scheamType, Class<?> 
typeArg, ClassLoader clsLoader) {
-        validateCustomSchemaType(scheamType, typeArg, clsLoader);
+    private static void validateSchemaType(String scheamType, Class<?> 
typeArg, ClassLoader clsLoader, boolean input) {
+        validateCustomSchemaType(scheamType, typeArg, clsLoader, input);
     }
 
     private static void validateSerDeType(String serdeClassName, Class<?> 
typeArg, ClassLoader clsLoader) {
@@ -929,7 +937,8 @@ public class ValidatorImpls {
         }
     }
 
-    private static void validateCustomSchemaType(String schemaClassName, 
Class<?> typeArg, ClassLoader clsLoader) {
+    private static void validateCustomSchemaType(String schemaClassName, 
Class<?> typeArg, ClassLoader clsLoader,
+                                                 boolean input) {
         Schema<?> schema = (Schema<?>) 
Reflections.createInstance(schemaClassName, clsLoader);
         if (schema == null) {
             throw new IllegalArgumentException(String.format("The Schema class 
%s does not exist",
@@ -948,9 +957,16 @@ public class ValidatorImpls {
             throw new IllegalArgumentException("Failed to load type class", e);
         }
 
-        if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
-            throw new IllegalArgumentException(
-                    "Schema type mismatch " + typeArg + " vs " + 
schemaTypes[0]);
+        if (input) {
+            if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + 
schemaTypes[0]);
+            }
+        } else {
+            if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + 
schemaTypes[0]);
+            }
         }
     }
 }
\ No newline at end of file

Reply via email to