This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e4aea8c Fix validator logic to differentiate between serializer and deserializer (#2523) e4aea8c is described below commit e4aea8cd04f8052ff91ba50294b25d01d8943d99 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Sep 6 11:15:49 2018 -0700 Fix validator logic to differentiate between serializer and deserializer (#2523) * Fix validator logic to differentiate between serializer and deserializer * Expand to include backend and schema * Fix buil * Fix unittest * Fixed unittest --- .../pulsar/functions/instance/ContextImpl.java | 4 +- .../pulsar/functions/instance/InstanceUtils.java | 37 +++++++++---- .../apache/pulsar/functions/sink/PulsarSink.java | 4 +- .../pulsar/functions/source/PulsarSource.java | 4 +- .../pulsar/functions/source/TopicSchema.java | 18 +++---- .../pulsar/functions/sink/PulsarSinkTest.java | 2 +- .../pulsar/functions/source/PulsarSourceTest.java | 2 +- .../functions/utils/validation/ValidatorImpls.java | 62 ++++++++++++++-------- 8 files changed, 80 insertions(+), 53 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index aec52fc..c4099f4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -254,13 +254,13 @@ class ContextImpl implements Context, SinkContext, SourceContext { @SuppressWarnings("unchecked") @Override public <O> CompletableFuture<Void> publish(String topicName, O object) { - return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object)); + return publish(topicName, object, ""); } @SuppressWarnings("unchecked") @Override public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) { - return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName)); + return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false)); } @SuppressWarnings("unchecked") diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java index b4a9bf9..86a9aa2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java @@ -31,27 +31,42 @@ import net.jodah.typetools.TypeResolver; @UtilityClass public class InstanceUtils { - public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg) { + public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg, + boolean deser) { SerDe<?> serDe = createInstance(serdeClassName, clsLoader, SerDe.class); Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, serDe.getClass()); - checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), - "Inconsistent types found between function input type and input serde type: " - + " function type = " + typeArg + " should be assignable from " - + inputSerdeTypeArgs[0]); + if (deser) { + checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), + "Inconsistent types found between function input type and serde type: " + + " function type = " + typeArg + " should be assignable from " + + inputSerdeTypeArgs[0]); + } else { + checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg), + "Inconsistent types found between function input type and serde type: " + + " serde type = " + inputSerdeTypeArgs[0] + " should be assignable from " + + typeArg); + } return serDe; } - public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg) { + public static Schema<?> initializeCustomSchema(String schemaClassName, ClassLoader clsLoader, Class<?> typeArg, + boolean input) { Schema<?> schema = createInstance(schemaClassName, clsLoader, Schema.class); Class<?>[] inputSerdeTypeArgs = TypeResolver.resolveRawArguments(Schema.class, schema.getClass()); - checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), - "Inconsistent types found between function input type and input schema type: " - + " function type = " + typeArg + " should be assignable from " - + inputSerdeTypeArgs[0]); - + if (input) { + checkArgument(typeArg.isAssignableFrom(inputSerdeTypeArgs[0]), + "Inconsistent types found between function type and schema type: " + + " function type = " + typeArg + " should be assignable from " + + inputSerdeTypeArgs[0]); + } else { + checkArgument(inputSerdeTypeArgs[0].isAssignableFrom(typeArg), + "Inconsistent types found between function type and schema type: " + + " schema type = " + inputSerdeTypeArgs[0] + " should be assignable from " + + typeArg); + } return schema; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 5ec725c..835e288 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -277,10 +277,10 @@ public class PulsarSink<T> implements Sink<T> { if (!StringUtils.isEmpty(pulsarSinkConfig.getSchemaType())) { return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg, - pulsarSinkConfig.getSchemaType()); + pulsarSinkConfig.getSchemaType(), false); } else { return (Schema<T>) topicSchema.getSchema(pulsarSinkConfig.getTopic(), typeArg, - pulsarSinkConfig.getSerdeClassName()); + pulsarSinkConfig.getSerdeClassName(), false); } } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 244ab70..e1059f3 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -162,9 +162,9 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> pulsarSourceConfig.getTopicSchema().forEach((topic, conf) -> { Schema<T> schema; if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { - schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName()); + schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSerdeClassName(), true); } else { - schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType()); + schema = (Schema<T>) topicSchema.getSchema(topic, typeArg, conf.getSchemaType(), true); } configs.put(topic, ConsumerConfig.<T> builder().schema(schema).isRegexPattern(conf.isRegexPattern()).build()); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 1802ee5..2ac5b65 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -51,16 +51,12 @@ public class TopicSchema { public static final String DEFAULT_SERDE = "org.apache.pulsar.functions.api.utils.DefaultSerDe"; - public Schema<?> getSchema(String topic, Object object) { - return getSchema(topic, object.getClass(), ""); + public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName, boolean input) { + return getSchema(topic, object.getClass(), schemaTypeOrClassName, input); } - public Schema<?> getSchema(String topic, Object object, String schemaTypeOrClassName) { - return getSchema(topic, object.getClass(), schemaTypeOrClassName); - } - - public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName) { - return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName)); + public Schema<?> getSchema(String topic, Class<?> clazz, String schemaTypeOrClassName, boolean input) { + return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(topic, clazz, schemaTypeOrClassName, input)); } public Schema<?> getSchema(String topic, Class<?> clazz, Optional<SchemaType> schemaType) { @@ -134,7 +130,7 @@ public class TopicSchema { } @SuppressWarnings("unchecked") - private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName) { + private <T> Schema<T> newSchemaInstance(String topic, Class<T> clazz, String schemaTypeOrClassName, boolean input) { // The schemaTypeOrClassName can represent multiple thing, either a schema type, a schema class name or a ser-de // class name. @@ -161,11 +157,11 @@ public class TopicSchema { // First try with Schema try { return (Schema<T>) InstanceUtils.initializeCustomSchema(schemaTypeOrClassName, - Thread.currentThread().getContextClassLoader(), clazz); + Thread.currentThread().getContextClassLoader(), clazz, input); } catch (Throwable t) { // Now try with Serde or just fail SerDe<T> serDe = (SerDe<T>) InstanceUtils.initializeSerDe(schemaTypeOrClassName, - Thread.currentThread().getContextClassLoader(), clazz); + Thread.currentThread().getContextClassLoader(), clazz, input); return new SerDeSchema<>(serDe); } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 766dee3..72e3c56 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -140,7 +140,7 @@ public class PulsarSinkTest { fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); - assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:")); } catch (Exception ex) { log.error("Exception: {}", ex, ex); assertTrue(false); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index 60f684f..e4825f2 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -159,7 +159,7 @@ public class PulsarSourceTest { fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); - assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and input serde type:")); + assertTrue(ex.getMessage().startsWith("Inconsistent types found between function input type and serde type:")); } catch (Exception ex) { log.error("Exception: {}", ex, ex); assertTrue(false); diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java index f264d2a..f60f3c0 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java @@ -381,7 +381,7 @@ public class ValidatorImpls { // implements SerDe class if (functionConfig.getCustomSerdeInputs() != null) { functionConfig.getCustomSerdeInputs().forEach((topicName, inputSerializer) -> { - validateSerde(inputSerializer, typeArgs[0], name, clsLoader); + validateSerde(inputSerializer, typeArgs[0], name, clsLoader, true); }); } @@ -389,7 +389,7 @@ public class ValidatorImpls { // implements SerDe class if (functionConfig.getCustomSchemaInputs() != null) { functionConfig.getCustomSchemaInputs().forEach((topicName, schemaType) -> { - validateSchema(schemaType, typeArgs[0], name, clsLoader); + validateSchema(schemaType, typeArgs[0], name, clsLoader, true); }); } @@ -405,10 +405,10 @@ public class ValidatorImpls { String.format("Only one of schemaType or serdeClassName should be set in inputSpec")); } if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) { - validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader); + validateSerde(conf.getSerdeClassName(), typeArgs[0], name, clsLoader, true); } if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) { - validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader); + validateSchema(conf.getSchemaType(), typeArgs[0], name, clsLoader, true); } }); } @@ -425,16 +425,17 @@ public class ValidatorImpls { } if (functionConfig.getOutputSchemaType() != null && !functionConfig.getOutputSchemaType().isEmpty()) { - validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader); + validateSchema(functionConfig.getOutputSchemaType(), typeArgs[1], name, clsLoader, false); } if (functionConfig.getOutputSerdeClassName() != null && !functionConfig.getOutputSerdeClassName().isEmpty()) { - validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader); + validateSerde(functionConfig.getOutputSerdeClassName(), typeArgs[1], name, clsLoader, false); } } - private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader) { + private static void validateSchema(String schemaType, Class<?> typeArg, String name, ClassLoader clsLoader, + boolean input) { if (StringUtils.isEmpty(schemaType) || getBuiltinSchemaType(schemaType) != null) { // If it's empty, we use the default schema and no need to validate // If it's built-in, no need to validate @@ -447,11 +448,12 @@ public class ValidatorImpls { schemaType, Schema.class.getCanonicalName())); } - validateSchemaType(schemaType, typeArg, clsLoader); + validateSchemaType(schemaType, typeArg, clsLoader, input); } } - private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader) { + private static void validateSerde(String inputSerializer, Class<?> typeArg, String name, ClassLoader clsLoader, + boolean deser) { if (StringUtils.isEmpty(inputSerializer)) return; Class<?> serdeClass; try { @@ -492,8 +494,14 @@ public class ValidatorImpls { throw new IllegalArgumentException("Failed to load type class", e); } - if (!fnInputClass.isAssignableFrom(serdeInputClass)) { - throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + if (deser) { + if (!fnInputClass.isAssignableFrom(serdeInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + } + } else { + if (!serdeInputClass.isAssignableFrom(fnInputClass)) { + throw new IllegalArgumentException("Serializer type mismatch " + typeArg + " vs " + serDeTypes[0]); + } } } } @@ -734,10 +742,10 @@ public class ValidatorImpls { } if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) { - FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader); + FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, clsLoader, false); } if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) { - FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader); + FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, clsLoader, false); } } catch (IOException e) { throw new IllegalArgumentException(e); @@ -774,13 +782,13 @@ public class ValidatorImpls { if (sinkConfig.getTopicToSerdeClassName() != null) { sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> { - FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader); + FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true); }); } if (sinkConfig.getTopicToSchemaType() != null) { sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> { - FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader); + FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true); }); } @@ -794,10 +802,10 @@ public class ValidatorImpls { throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set"); } if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) { - FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader); + FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true); } if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) { - FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader); + FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true); } }); } @@ -900,8 +908,8 @@ public class ValidatorImpls { } } - private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader) { - validateCustomSchemaType(scheamType, typeArg, clsLoader); + private static void validateSchemaType(String scheamType, Class<?> typeArg, ClassLoader clsLoader, boolean input) { + validateCustomSchemaType(scheamType, typeArg, clsLoader, input); } private static void validateSerDeType(String serdeClassName, Class<?> typeArg, ClassLoader clsLoader) { @@ -929,7 +937,8 @@ public class ValidatorImpls { } } - private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader) { + private static void validateCustomSchemaType(String schemaClassName, Class<?> typeArg, ClassLoader clsLoader, + boolean input) { Schema<?> schema = (Schema<?>) Reflections.createInstance(schemaClassName, clsLoader); if (schema == null) { throw new IllegalArgumentException(String.format("The Schema class %s does not exist", @@ -948,9 +957,16 @@ public class ValidatorImpls { throw new IllegalArgumentException("Failed to load type class", e); } - if (!fnInputClass.isAssignableFrom(schemaInputClass)) { - throw new IllegalArgumentException( - "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]); + if (input) { + if (!fnInputClass.isAssignableFrom(schemaInputClass)) { + throw new IllegalArgumentException( + "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]); + } + } else { + if (!schemaInputClass.isAssignableFrom(fnInputClass)) { + throw new IllegalArgumentException( + "Schema type mismatch " + typeArg + " vs " + schemaTypes[0]); + } } } } \ No newline at end of file