srkukarni closed pull request #2523: Fix validator logic to differentiate 
between serializer and deserializer
URL: https://github.com/apache/incubator-pulsar/pull/2523
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index aec52fc0c8..c4099f4ff7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -254,13 +254,13 @@ public ByteBuffer getState(String key) {
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object) {
-        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object));
+        return publish(topicName, object, "");
     }
 
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, 
String schemaOrSerdeClassName) {
-        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName));
+        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index b4a9bf9cf3..86a9aa211b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -31,27 +31,42 @@
 
 @UtilityClass
 public class InstanceUtils {
-    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader 
clsLoader, Class<?> typeArg) {
+    public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader 
clsLoader, Class<?> typeArg,
+                                           boolean deser) {
         SerDe<?> serDe = createInstance(serdeClassName, clsLoader, 
SerDe.class);
 
         Class<?>[] inputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass());
-        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
-                "Inconsistent types found between function input type and 
input serde type: "
-                        + " function type = " + typeArg + " should be 
assignable from "
-                        + inputSerdeTypeArgs[0]);
+        if (deser) {
+            checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                    "Inconsistent types found between function input type and 
serde type: "
+                            + " function type = " + typeArg + " should be 
assignable from "
+                            + inputSerdeTypeArgs[0]);
+        } else {
+            checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+                    "Inconsistent types found between function input type and 
serde type: "
+                            + " serde type = " + inputSerdeTypeArgs[0] + " 
should be assignable from "
+                            + typeArg);
+        }
 
         return serDe;
     }
 
-    public static Schema<?> initializeCustomSchema(String schemaClassName, 
ClassLoader clsLoader, Class<?> typeArg) {
+    public static Schema<?> initializeCustomSchema(String schemaClassName, 
ClassLoader clsLoader, Class<?> typeArg,
+                                                   boolean input) {
         Schema<?> schema = createInstance(schemaClassName, clsLoader, 
Schema.class);
 
         Class<?>[] inputSerdeTypeArgs = 
TypeResolver.resolveRawArguments(Schema.class, schema.getClass());
-        checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
-                "Inconsistent types found between function input type and 
input schema type: "
-                        + " function type = " + typeArg + " should be 
assignable from "
-                        + inputSerdeTypeArgs[0]);
-
+        if (input) {
+            checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]),
+                    "Inconsistent types found between function type and schema 
type: "
+                            + " function type = " + typeArg + " should be 
assignable from "
+                            + inputSerdeTypeArgs[0]);
+        } else {
+            checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg),
+                    "Inconsistent types found between function type and schema 
type: "
+                            + " schema type = " + inputSerdeTypeArgs[0] + " 
should be assignable from "
+                            + typeArg);
+        }
         return schema;
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 5ec725cdc4..835e288075 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -277,10 +277,10 @@ public void close() throws Exception {
 
         if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) {
             return (Schema<T>) 
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
-                    pulsarSinkConfig.getSchemaType());
+                    pulsarSinkConfig.getSchemaType(), false);
         } else {
             return (Schema<T>) 
topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg,
-                    pulsarSinkConfig.getSerdeClassName());
+                    pulsarSinkConfig.getSerdeClassName(), false);
         }
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 244ab70d4a..e1059f390e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -162,9 +162,9 @@ public void close() throws Exception {
         pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> {
             Schema<T> schema;
             if (conf.getSerdeClassName() != null && 
!conf.getSerdeClassName().isEmpty()) {
-                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSerdeClassName());
+                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSerdeClassName(), true);
             } else {
-                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSchemaType());
+                schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, 
conf.getSchemaType(), true);
             }
             configs.put(topic,
                     ConsumerConfig.<T> 
builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build());
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 1802ee5e61..2ac5b65764 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -51,16 +51,12 @@ public TopicSchema(PulsarClient client) {
 
     public static final String DEFAULT_SERDE = 
"org.apache.pulsar.functions.api.utils.DefaultSerDe";
 
-    public Schema<?> getSchema(String topic, Object object) {
-        return getSchema(topic, object.getClass(), "");
+    public Schema<?> getSchema(String topic, Object object, String 
schemaTypeOrClassName, boolean input) {
+        return getSchema(topic, object.getClass(), schemaTypeOrClassName, 
input);
     }
 
-    public Schema<?> getSchema(String topic, Object object, String 
schemaTypeOrClassName) {
-        return getSchema(topic, object.getClass(), schemaTypeOrClassName);
-    }
-
-    public Schema<?> getSchema(String topic, Class<?> clazz, String 
schemaTypeOrClassName) {
-        return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(topic, clazz, schemaTypeOrClassName));
+    public Schema<?> getSchema(String topic, Class<?> clazz, String 
schemaTypeOrClassName, boolean input) {
+        return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(topic, clazz, schemaTypeOrClassName, input));
     }
 
     public Schema<?> getSchema(String topic, Class<?> clazz, 
Optional<SchemaType> schemaType) {
@@ -134,7 +130,7 @@ private static boolean isProtobufClass(Class<?> pojoClazz) {
     }
 
     @SuppressWarnings("unchecked")
-    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, 
String schemaTypeOrClassName) {
+    private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, 
String schemaTypeOrClassName, boolean input) {
         // The schemaTypeOrClassName can represent multiple thing, either a 
schema type, a schema class name or a ser-de
         // class name.
 
@@ -161,11 +157,11 @@ private static boolean isProtobufClass(Class<?> 
pojoClazz) {
         // First try with Schema
         try {
             return (Schema<T>) 
InstanceUtils.initializeCustomSchema(schemaTypeOrClassName,
-                    Thread.currentThread().getContextClassLoader(), clazz);
+                    Thread.currentThread().getContextClassLoader(), clazz, 
input);
         } catch (Throwable t) {
             // Now try with Serde or just fail
             SerDe<T> serDe = (SerDe<T>) 
InstanceUtils.initializeSerDe(schemaTypeOrClassName,
-                    Thread.currentThread().getContextClassLoader(), clazz);
+                    Thread.currentThread().getContextClassLoader(), clazz, 
input);
             return new SerDeSchema<>(serDe);
         }
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 766dee389c..72e3c56a1c 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -140,7 +140,7 @@ public void testInconsistentOutputType() throws IOException 
{
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 60f684fee2..e4825f200f 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -159,7 +159,7 @@ public void testInconsistentInputType() throws IOException {
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
-            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and input serde type:"));
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found 
between function input type and serde type:"));
         } catch (Exception ex) {
             log.error("Exception: {}", ex, ex);
             assertTrue(false);
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index cbeb970f77..40b59adeb8 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -381,7 +381,7 @@ private static void doJavaChecks(FunctionConfig 
functionConfig, String name) {
             // implements SerDe class
             if (functionConfig.getCustomSerdeInputs() != null) {
                 functionConfig.getCustomSerdeInputs().forEach((topicName, 
inputSerializer) -> {
-                    validateSerde(inputSerializer, typeArgs[0], name, 
clsLoader);
+                    validateSerde(inputSerializer, typeArgs[0], name, 
clsLoader, true);
                 });
             }
 
@@ -389,7 +389,7 @@ private static void doJavaChecks(FunctionConfig 
functionConfig, String name) {
             // implements SerDe class
             if (functionConfig.getCustomSchemaInputs() != null) {
                 functionConfig.getCustomSchemaInputs().forEach((topicName, 
schemaType) -> {
-                    validateSchema(schemaType, typeArgs[0], name, clsLoader);
+                    validateSchema(schemaType, typeArgs[0], name, clsLoader, 
true);
                 });
             }
 
@@ -405,10 +405,10 @@ private static void doJavaChecks(FunctionConfig 
functionConfig, String name) {
                                 String.format("Only one of schemaType or 
serdeClassName should be set in inputSpec"));
                     }
                     if (conf.getSerdeClassName() != null && 
!conf.getSerdeClassName().isEmpty()) {
-                        validateSerde(conf.getSerdeClassName(), typeArgs[0], 
name, clsLoader);
+                        validateSerde(conf.getSerdeClassName(), typeArgs[0], 
name, clsLoader, true);
                     }
                     if (conf.getSchemaType() != null && 
!conf.getSchemaType().isEmpty()) {
-                        validateSchema(conf.getSchemaType(), typeArgs[0], 
name, clsLoader);
+                        validateSchema(conf.getSchemaType(), typeArgs[0], 
name, clsLoader, true);
                     }
                 });
             }
@@ -425,16 +425,17 @@ private static void doJavaChecks(FunctionConfig 
functionConfig, String name) {
             }
 
             if (functionConfig.getOutputSchemaType() != null && 
!functionConfig.getOutputSchemaType().isEmpty()) {
-                validateSchema(functionConfig.getOutputSchemaType(), 
typeArgs[1], name, clsLoader);
+                validateSchema(functionConfig.getOutputSchemaType(), 
typeArgs[1], name, clsLoader, false);
             }
 
             if (functionConfig.getOutputSerdeClassName() != null && 
!functionConfig.getOutputSerdeClassName().isEmpty()) {
-                validateSerde(functionConfig.getOutputSerdeClassName(), 
typeArgs[1], name, clsLoader);
+                validateSerde(functionConfig.getOutputSerdeClassName(), 
typeArgs[1], name, clsLoader, false);
             }
 
         }
 
-        private static void validateSchema(String schemaType, Class<?> 
typeArg, String name, ClassLoader clsLoader) {
+        private static void validateSchema(String schemaType, Class<?> 
typeArg, String name, ClassLoader clsLoader,
+                                           boolean input) {
             if (StringUtils.isEmpty(schemaType) || 
getBuiltinSchemaType(schemaType) != null) {
                 // If it's empty, we use the default schema and no need to 
validate
                 // If it's built-in, no need to validate
@@ -447,11 +448,12 @@ private static void validateSchema(String schemaType, 
Class<?> typeArg, String n
                                     schemaType, 
Schema.class.getCanonicalName()));
                 }
 
-                validateSchemaType(schemaType, typeArg, clsLoader);
+                validateSchemaType(schemaType, typeArg, clsLoader, input);
             }
         }
 
-        private static void validateSerde(String inputSerializer, Class<?> 
typeArg, String name, ClassLoader clsLoader) {
+        private static void validateSerde(String inputSerializer, Class<?> 
typeArg, String name, ClassLoader clsLoader,
+                                          boolean deser) {
             if (StringUtils.isEmpty(inputSerializer)) return;
             Class<?> serdeClass;
             try {
@@ -492,8 +494,14 @@ private static void validateSerde(String inputSerializer, 
Class<?> typeArg, Stri
                     throw new IllegalArgumentException("Failed to load type 
class", e);
                 }
 
-                if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
-                    throw new IllegalArgumentException("Serializer type 
mismatch " + typeArg + " vs " + serDeTypes[0]);
+                if (deser) {
+                    if (!fnInputClass.isAssignableFrom(serdeInputClass)) {
+                        throw new IllegalArgumentException("Serializer type 
mismatch " + typeArg + " vs " + serDeTypes[0]);
+                    }
+                } else {
+                    if (!serdeInputClass.isAssignableFrom(fnInputClass)) {
+                        throw new IllegalArgumentException("Serializer type 
mismatch " + typeArg + " vs " + serDeTypes[0]);
+                    }
                 }
             }
         }
@@ -743,10 +751,10 @@ public void validateField(String name, Object o) {
                 }
 
                 if (sourceConfig.getSerdeClassName() != null && 
!sourceConfig.getSerdeClassName().isEmpty()) {
-                    
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, 
name, clsLoader);
+                    
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, 
name, clsLoader, false);
                 }
                 if (sourceConfig.getSchemaType() != null && 
!sourceConfig.getSchemaType().isEmpty()) {
-                    
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, 
name, clsLoader);
+                    
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, 
name, clsLoader, false);
                 }
             } catch (IOException e) {
                 throw new IllegalArgumentException(e);
@@ -783,13 +791,13 @@ public void validateField(String name, Object o) {
 
                 if (sinkConfig.getTopicToSerdeClassName() != null) {
                     sinkConfig.getTopicToSerdeClassName().forEach((topicName, 
serdeClassName) -> {
-                        FunctionConfigValidator.validateSerde(serdeClassName, 
typeArg, name, clsLoader);
+                        FunctionConfigValidator.validateSerde(serdeClassName, 
typeArg, name, clsLoader, true);
                     });
                 }
 
                 if (sinkConfig.getTopicToSchemaType() != null) {
                     sinkConfig.getTopicToSchemaType().forEach((topicName, 
schemaType) -> {
-                        FunctionConfigValidator.validateSchema(schemaType, 
typeArg, name, clsLoader);
+                        FunctionConfigValidator.validateSchema(schemaType, 
typeArg, name, clsLoader, true);
                     });
                 }
 
@@ -803,10 +811,10 @@ public void validateField(String name, Object o) {
                             throw new IllegalArgumentException("Only one of 
serdeClassName or schemaType should be set");
                         }
                         if (consumerSpec.getSerdeClassName() != null && 
!consumerSpec.getSerdeClassName().isEmpty()) {
-                            
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), 
typeArg, name, clsLoader);
+                            
FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), 
typeArg, name, clsLoader, true);
                         }
                         if (consumerSpec.getSchemaType() != null && 
!consumerSpec.getSchemaType().isEmpty()) {
-                            
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, 
name, clsLoader);
+                            
FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, 
name, clsLoader, true);
                         }
                     });
                 }
@@ -909,8 +917,8 @@ private static SchemaType getBuiltinSchemaType(String 
schemaTypeOrClassName) {
         }
     }
 
-    private static void validateSchemaType(String scheamType, Class<?> 
typeArg, ClassLoader clsLoader) {
-        validateCustomSchemaType(scheamType, typeArg, clsLoader);
+    private static void validateSchemaType(String scheamType, Class<?> 
typeArg, ClassLoader clsLoader, boolean input) {
+        validateCustomSchemaType(scheamType, typeArg, clsLoader, input);
     }
 
     private static void validateSerDeType(String serdeClassName, Class<?> 
typeArg, ClassLoader clsLoader) {
@@ -938,7 +946,8 @@ private static void validateSerDeType(String 
serdeClassName, Class<?> typeArg, C
         }
     }
 
-    private static void validateCustomSchemaType(String schemaClassName, 
Class<?> typeArg, ClassLoader clsLoader) {
+    private static void validateCustomSchemaType(String schemaClassName, 
Class<?> typeArg, ClassLoader clsLoader,
+                                                 boolean input) {
         Schema<?> schema = (Schema<?>) 
Reflections.createInstance(schemaClassName, clsLoader);
         if (schema == null) {
             throw new IllegalArgumentException(String.format("The Schema class 
%s does not exist",
@@ -957,9 +966,16 @@ private static void validateCustomSchemaType(String 
schemaClassName, Class<?> ty
             throw new IllegalArgumentException("Failed to load type class", e);
         }
 
-        if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
-            throw new IllegalArgumentException(
-                    "Schema type mismatch " + typeArg + " vs " + 
schemaTypes[0]);
+        if (input) {
+            if (!fnInputClass.isAssignableFrom(schemaInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + 
schemaTypes[0]);
+            }
+        } else {
+            if (!schemaInputClass.isAssignableFrom(fnInputClass)) {
+                throw new IllegalArgumentException(
+                        "Schema type mismatch " + typeArg + " vs " + 
schemaTypes[0]);
+            }
         }
     }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to