srkukarni closed pull request #2523: Fix validator logic to differentiate between serializer and deserializer URL: https://github.com/apache/incubator-pulsar/pull/2523
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index aec52fc0c8..c4099f4ff7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -254,13 +254,13 @@ public ByteBuffer getState(String key) { @SuppressWarnings("unchecked") @Override public <O> CompletableFuture<Void> publish(String topicName, O object) { - return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object)); + return publish(topicName, object, ""); } @SuppressWarnings("unchecked") @Override public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) { - return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName)); + return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false)); } @SuppressWarnings("unchecked") diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index b4a9bf9cf3..86a9aa211b 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -31,27 +31,42 @@ @UtilityClass public class InstanceUtils { - public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg) { + public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg, + boolean deser) { SerDe<?> serDe = createInstance(serdeClassName, clsLoader, SerDe.class); Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), - "Inconsistent types found between function input type and input serde type: " - + " function type = " + typeArg + " should be assignable from " - + inputSerdeTypeArgs[0]); + if (deser) { + checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), + "Inconsistent types found between function input type and serde type: " + + " function type = " + typeArg + " should be assignable from " + + inputSerdeTypeArgs[0]); + } else { + checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg), + "Inconsistent types found between function input type and serde type: " + + " serde type = " + inputSerdeTypeArgs[0] + " should be assignable from " + + typeArg); + } return serDe; } - public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg) { + public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg, + boolean input) { Schema<?> schema = createInstance(schemaClassName, clsLoader, Schema.class); Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(Schema.class, schema.getClass()); - checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), - "Inconsistent types found between function input type and input schema type: " - + " function type = " + typeArg + " should be assignable from " - + inputSerdeTypeArgs[0]); - + if (input) { + checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), + "Inconsistent types found between function type and schema type: " + + " function type = " + typeArg + " should be assignable from " + + inputSerdeTypeArgs[0]); + } else { + checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg), + "Inconsistent types found between function type and schema type: " + + " schema type = " + inputSerdeTypeArgs[0] + " should be assignable from " + + typeArg); + } return schema; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 5ec725cdc4..835e288075 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -277,10 +277,10 @@ public void close() throws Exception { if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) { return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg, - pulsarSinkConfig.getSchemaType()); + pulsarSinkConfig.getSchemaType(), false); } else { return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg, - pulsarSinkConfig.getSerdeClassName()); + pulsarSinkConfig.getSerdeClassName(), false); } } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 244ab70d4a..e1059f390e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -162,9 +162,9 @@ public void close() throws Exception { pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> { Schema<T> schema; if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { - schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName()); + schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true); } else { - schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType()); + schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true); } configs.put(topic, ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 1802ee5e61..2ac5b65764 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -51,16 +51,12 @@ public TopicSchema(PulsarClient client) { public static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe"; - public Schema<?> getSchema(String topic, Object object) { - return getSchema(topic, object.getClass(), ""); + public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName, boolean input) { + return getSchema(topic, object.getClass(), schemaTypeOrClassName, input); } - public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName) { - return getSchema(topic, object.getClass(), schemaTypeOrClassName); - } - - public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName) { - return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName)); + public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName, boolean input) { + return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName, input)); } public Schema<?> getSchema(String topic, Class<?> clazz, Optional<SchemaType> schemaType) { @@ -134,7 +130,7 @@ private static boolean isProtobufClass(Class<?> pojoClazz) { } @SuppressWarnings("unchecked") - private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName) { + private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) { // The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de // class name. @@ -161,11 +157,11 @@ private static boolean isProtobufClass(Class<?> pojoClazz) { // First try with Schema try { return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName, - Thread.currentThread().getContextClassLoader(), clazz); + Thread.currentThread().getContextClassLoader(), clazz, input); } catch (Throwable t) { // Now try with Serde or just fail SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName, - Thread.currentThread().getContextClassLoader(), clazz); + Thread.currentThread().getContextClassLoader(), clazz, input); return new SerDeSchema<>(serDe); } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 766dee389c..72e3c56a1c 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -140,7 +140,7 @@ public void testInconsistentOutputType() throws IOException { fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); - assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:")); } catch (Exception ex) { log.error("Exception: {}", ex, ex); assertTrue(false); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 60f684fee2..e4825f200f 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -159,7 +159,7 @@ public void testInconsistentInputType() throws IOException { fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); - assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:")); } catch (Exception ex) { log.error("Exception: {}", ex, ex); assertTrue(false); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index cbeb970f77..40b59adeb8 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -381,7 +381,7 @@ private static void doJavaChecks(FunctionConfig functionConfig, String name) { // implements SerDe class if (functionConfig.getCustomSerdeInputs() != null) { functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> { - validateSerde(inputSerializer, typeArgs[0], name, clsLoader); + validateSerde(inputSerializer, typeArgs[0], name, clsLoader, true); }); } @@ -389,7 +389,7 @@ private static void doJavaChecks(FunctionConfig functionConfig, String name) { // implements SerDe class if (functionConfig.getCustomSchemaInputs() != null) { functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> { - validateSchema(schemaType, typeArgs[0], name, clsLoader); + validateSchema(schemaType, typeArgs[0], name, clsLoader, true); }); } @@ -405,10 +405,10 @@ private static void doJavaChecks(FunctionConfig functionConfig, String name) { String.format("Only one of schemaType or serdeClassName should be set in inputSpec")); } if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { - validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader); + validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader, true); } if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) { - validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader); + validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader, true); } }); } @@ -425,16 +425,17 @@ private static void doJavaChecks(FunctionConfig functionConfig, String name) { } if (functionConfig.getOutputSchemaType() != null && !functionConfig.getOutputSchemaType().isEmpty()) { - validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader); + validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader, false); } if (functionConfig.getOutputSerdeClassName() != null && !functionConfig.getOutputSerdeClassName().isEmpty()) { - validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader); + validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader, false); } } - private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader) { + private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader, + boolean input) { if (StringUtils.isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) { // If it's empty, we use the default schema and no need to validate // If it's built-in, no need to validate @@ -447,11 +448,12 @@ private static void validateSchema(String schemaType, Class<?> typeArg, String n schemaType, Schema.class.getCanonicalName())); } - validateSchemaType(schemaType, typeArg, clsLoader); + validateSchemaType(schemaType, typeArg, clsLoader, input); } } - private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader) { + private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader, + boolean deser) { if (StringUtils.isEmpty(inputSerializer)) return; Class<?> serdeClass; try { @@ -492,8 +494,14 @@ private static void validateSerde(String inputSerializer, Class<?> typeArg, Stri throw new IllegalArgumentException("Failed to load type class", e); } - if (!fnInputClass.isAssignableFrom(serdeInputClass)) { - throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + if (deser) { + if (!fnInputClass.isAssignableFrom(serdeInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + } + } else { + if (!serdeInputClass.isAssignableFrom(fnInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + } } } } @@ -743,10 +751,10 @@ public void validateField(String name, Object o) { } if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) { - FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader); + FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader, false); } if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { - FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader); + FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader, false); } } catch (IOException e) { throw new IllegalArgumentException(e); @@ -783,13 +791,13 @@ public void validateField(String name, Object o) { if (sinkConfig.getTopicToSerdeClassName() != null) { sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { - FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader); + FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true); }); } if (sinkConfig.getTopicToSchemaType() != null) { sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { - FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader); + FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true); }); } @@ -803,10 +811,10 @@ public void validateField(String name, Object o) { throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); } if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) { - FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader); + FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true); } if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) { - FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader); + FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true); } }); } @@ -909,8 +917,8 @@ private static SchemaType getBuiltinSchemaType(String schemaTypeOrClassName) { } } - private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader) { - validateCustomSchemaType(scheamType, typeArg, clsLoader); + private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader, boolean input) { + validateCustomSchemaType(scheamType, typeArg, clsLoader, input); } private static void validateSerDeType(String serdeClassName, Class<?> typeArg, ClassLoader clsLoader) { @@ -938,7 +946,8 @@ private static void validateSerDeType(String serdeClassName, Class<?> typeArg, C } } - private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader) { + private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader, + boolean input) { Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader); if (schema == null) { throw new IllegalArgumentException(String.format("The Schema class %s does not exist", @@ -957,9 +966,16 @@ private static void validateCustomSchemaType(String schemaClassName, Class<?> ty throw new IllegalArgumentException("Failed to load type class", e); } - if (!fnInputClass.isAssignableFrom(schemaInputClass)) { - throw new IllegalArgumentException( - "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]); + if (input) { + if (!fnInputClass.isAssignableFrom(schemaInputClass)) { + throw new IllegalArgumentException( + "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]); + } + } else { + if (!schemaInputClass.isAssignableFrom(fnInputClass)) { + throw new IllegalArgumentException( + "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]); + } } } } \ No newline at end of file ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services