This is an automated email from the ASF dual-hosted git repository. reuvenlax pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 788ce61 Merge pull request #7267: [BEAM-4454] Support Avro POJO objects 788ce61 is described below commit 788ce61bd7c48bc16a8eaa93a46ac403155f4422 Author: reuvenlax <re...@google.com> AuthorDate: Fri Dec 14 09:15:56 2018 -0800 Merge pull request #7267: [BEAM-4454] Support Avro POJO objects * Add remaining Schema support for AVRO records: * Add support for SpecificRecord using ByteBuddy codegen. * Add helper methods for GenericRecord. * Fix uncovered bugs involving nullable support. --- ...ificRecordSchema.java => AvroRecordSchema.java} | 19 +- .../schemas/AvroSpecificRecordGetterFactory.java | 30 --- .../AvroSpecificRecordUserTypeCreatorFactory.java | 29 --- .../sdk/schemas/FieldValueTypeInformation.java | 127 +++++++++--- .../beam/sdk/schemas/FromRowUsingCreator.java | 2 +- .../sdk/schemas/GetterBasedSchemaProvider.java | 2 - .../apache/beam/sdk/schemas/JavaBeanSchema.java | 68 ++++++- .../apache/beam/sdk/schemas/JavaFieldSchema.java | 37 +++- .../schemas/PojoTypeUserTypeCreatorFactory.java | 28 --- .../schemas/SchemaUserTypeConstructorCreator.java | 2 +- .../AvroSpecificRecordTypeInformationFactory.java | 32 --- .../apache/beam/sdk/schemas/utils/AvroUtils.java | 123 +++++++---- ...ionFactory.java => FieldValueTypeSupplier.java} | 18 +- .../sdk/schemas/utils/JavaBeanGetterFactory.java | 32 --- .../sdk/schemas/utils/JavaBeanSetterFactory.java | 31 --- .../utils/JavaBeanTypeInformationFactory.java | 32 --- .../beam/sdk/schemas/utils/JavaBeanUtils.java | 162 +++++---------- .../apache/beam/sdk/schemas/utils/POJOUtils.java | 101 ++++----- .../sdk/schemas/utils/PojoValueGetterFactory.java | 31 --- .../sdk/schemas/utils/PojoValueSetterFactory.java | 31 --- .../beam/sdk/schemas/utils/ReflectUtils.java | 13 +- .../sdk/schemas/utils/StaticSchemaInference.java | 104 +--------- .../apache/beam/sdk/schemas/AvroSchemaTest.java | 226 ++++++++++++++++++++- .../beam/sdk/schemas/utils/JavaBeanUtilsTest.java | 49 ++--- .../beam/sdk/schemas/utils/POJOUtilsTest.java | 16 +- 25 files changed, 652 insertions(+), 693 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java similarity index 67% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java index d8e4bda..29bf51a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java @@ -17,35 +17,34 @@ */ package org.apache.beam.sdk.schemas; -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** - * A {@link SchemaProvider} for AVRO generated SpecificRecords. + * A {@link SchemaProvider} for AVRO generated SpecificRecords and POJOs. * - * <p>This provider infers a schema from generates SpecificRecord objects, and creates schemas and - * rows that bind to the appropriate fields. + * <p>This provider infers a schema from generated SpecificRecord objects, and creates schemas and + * rows that bind to the appropriate fields. This provider also infers schemas from Java POJO + * objects, creating a schema that matches that inferred by the AVRO libraries. */ -public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider { +public class AvroRecordSchema extends GetterBasedSchemaProvider { @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { - return AvroUtils.getSchema((Class<? extends SpecificRecord>) typeDescriptor.getRawType()); + return AvroUtils.getSchema(typeDescriptor.getRawType()); } @Override public FieldValueGetterFactory fieldValueGetterFactory() { - return new AvroSpecificRecordGetterFactory(); + return AvroUtils::getGetters; } @Override public UserTypeCreatorFactory schemaTypeCreatorFactory() { - return new AvroSpecificRecordUserTypeCreatorFactory(); + return AvroUtils::getCreator; } @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { - return new AvroSpecificRecordTypeInformationFactory(); + return AvroUtils::getFieldTypes; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java deleted file mode 100644 index fcb85f4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas; - -import java.util.List; -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.utils.AvroUtils; - -/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */ -public class AvroSpecificRecordGetterFactory implements FieldValueGetterFactory { - @Override - public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java deleted file mode 100644 index 68d0d6a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas; - -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.utils.AvroUtils; - -/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */ -public class AvroSpecificRecordUserTypeCreatorFactory implements UserTypeCreatorFactory { - @Override - public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { - return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java index 5cccdf6..593853f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java @@ -22,22 +22,33 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; import java.io.Serializable; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.Collection; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; +import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; -/** Represents type information for a schema field. */ +/** Represents type information for a Java type that will be used to infer a Schema type. */ @AutoValue public abstract class FieldValueTypeInformation implements Serializable { /** Returns the field name. */ public abstract String getName(); + /** Returns whether the field is nullable. */ + public abstract boolean isNullable(); + /** Returns the field type. */ - public abstract Class getType(); + public abstract TypeDescriptor getType(); + + @Nullable + public abstract Field getField(); + + @Nullable + public abstract Method getMethod(); /** If the field is a container type, returns the element type. */ @Nullable @@ -51,26 +62,90 @@ public abstract class FieldValueTypeInformation implements Serializable { @Nullable public abstract Type getMapValueType(); - public static FieldValueTypeInformation of(Field field) { - return new AutoValue_FieldValueTypeInformation( - field.getName(), - field.getType(), - getArrayComponentType(field), - getMapKeyType(field), - getMapValueType(field)); + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + public abstract Builder setName(String name); + + public abstract Builder setNullable(boolean nullable); + + public abstract Builder setType(TypeDescriptor type); + + public abstract Builder setField(@Nullable Field field); + + public abstract Builder setMethod(@Nullable Method method); + + public abstract Builder setElementType(@Nullable Type elementType); + + public abstract Builder setMapKeyType(@Nullable Type mapKeyType); + + public abstract Builder setMapValueType(@Nullable Type mapValueType); + + abstract FieldValueTypeInformation build(); + } + + public static FieldValueTypeInformation forField(Field field) { + return new AutoValue_FieldValueTypeInformation.Builder() + .setName(field.getName()) + .setNullable(field.isAnnotationPresent(Nullable.class)) + .setType(TypeDescriptor.of(field.getGenericType())) + .setField(field) + .setElementType(getArrayComponentType(field)) + .setMapKeyType(getMapKeyType(field)) + .setMapValueType(getMapValueType(field)) + .build(); + } + + public static FieldValueTypeInformation forGetter(Method method) { + String name; + if (method.getName().startsWith("get")) { + name = ReflectUtils.stripPrefix(method.getName(), "get"); + } else if (method.getName().startsWith("is")) { + name = ReflectUtils.stripPrefix(method.getName(), "is"); + } else { + throw new RuntimeException("Getter has wrong prefix " + method.getName()); + } + + TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType()); + boolean nullable = method.isAnnotationPresent(Nullable.class); + return new AutoValue_FieldValueTypeInformation.Builder() + .setName(name) + .setNullable(nullable) + .setType(type) + .setMethod(method) + .setElementType(getArrayComponentType(type)) + .setMapKeyType(getMapKeyType(type)) + .setMapValueType(getMapValueType(type)) + .build(); } - public static FieldValueTypeInformation of(TypeInformation typeInformation) { - return new AutoValue_FieldValueTypeInformation( - typeInformation.getName(), - typeInformation.getType().getRawType(), - getArrayComponentType(typeInformation), - getMapKeyType(typeInformation), - getMapValueType(typeInformation)); + public static FieldValueTypeInformation forSetter(Method method) { + String name; + if (method.getName().startsWith("set")) { + name = ReflectUtils.stripPrefix(method.getName(), "set"); + } else { + throw new RuntimeException("Setter has wrong prefix " + method.getName()); + } + if (method.getParameterCount() != 1) { + throw new RuntimeException("Setter methods should take a single argument."); + } + TypeDescriptor type = TypeDescriptor.of(method.getGenericParameterTypes()[0]); + boolean nullable = + Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance); + return new AutoValue_FieldValueTypeInformation.Builder() + .setName(name) + .setNullable(nullable) + .setType(type) + .setMethod(method) + .setElementType(getArrayComponentType(type)) + .setMapKeyType(getMapKeyType(type)) + .setMapValueType(getMapValueType(type)) + .build(); } - private static Type getArrayComponentType(TypeInformation typeInformation) { - return getArrayComponentType(typeInformation.getType()); + public FieldValueTypeInformation withName(String name) { + return toBuilder().setName(name).build(); } private static Type getArrayComponentType(Field field) { @@ -98,15 +173,19 @@ public abstract class FieldValueTypeInformation implements Serializable { return null; } + public Class getRawType() { + return getType().getRawType(); + } + // If the Field is a map type, returns the key type, otherwise returns a null reference. @Nullable private static Type getMapKeyType(Field field) { - return getMapType(TypeDescriptor.of(field.getGenericType()), 0); + return getMapKeyType(TypeDescriptor.of(field.getGenericType())); } @Nullable - private static Type getMapKeyType(TypeInformation typeInformation) { - return getMapType(typeInformation.getType(), 0); + private static Type getMapKeyType(TypeDescriptor<?> typeDescriptor) { + return getMapType(typeDescriptor, 0); } // If the Field is a map type, returns the value type, otherwise returns a null reference. @@ -116,8 +195,8 @@ public abstract class FieldValueTypeInformation implements Serializable { } @Nullable - private static Type getMapValueType(TypeInformation typeInformation) { - return getMapType(typeInformation.getType(), 1); + private static Type getMapValueType(TypeDescriptor typeDescriptor) { + return getMapType(typeDescriptor, 1); } // If the Field is a map type, returns the key or value type (0 is key type, 1 is value). diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java index 2ec1a42..139281f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java @@ -73,7 +73,7 @@ class FromRowUsingCreator<T> implements SerializableFunction<Row, T> { fromValue( type, row.getValue(i), - typeInformation.getType(), + typeInformation.getRawType(), typeInformation.getElementType(), typeInformation.getMapKeyType(), typeInformation.getMapValueType(), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java index f7757a8..677823f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.schemas; import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -38,7 +37,6 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider { abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory(); /** Implementing class should override to return a constructor factory. */ - @Nullable abstract UserTypeCreatorFactory schemaTypeCreatorFactory(); @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 2c22400..8eb8022 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -17,13 +17,16 @@ */ package org.apache.beam.sdk.schemas; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.schemas.utils.JavaBeanGetterFactory; -import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory; -import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory; +import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier; import org.apache.beam.sdk.schemas.utils.JavaBeanUtils; -import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -41,15 +44,55 @@ import org.apache.beam.sdk.values.TypeDescriptor; */ @Experimental(Kind.SCHEMAS) public class JavaBeanSchema extends GetterBasedSchemaProvider { + /** {@link FieldValueTypeSupplier} that's based on getter methods. */ + @VisibleForTesting + public static class GetterTypeSupplier implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isGetter) + .map(FieldValueTypeInformation::forGetter) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } + } + + /** {@link FieldValueTypeSupplier} that's based on setter methods. */ + @VisibleForTesting + public static class SetterTypeSupplier implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isSetter) + .map(FieldValueTypeInformation::forSetter) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } + } + @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { - return JavaBeanUtils.schemaFromJavaBeanClass( - typeDescriptor.getRawType(), SerializableFunctions.identity()); + return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType()); } @Override public FieldValueGetterFactory fieldValueGetterFactory() { - return new JavaBeanGetterFactory(); + return (Class<?> targetClass, Schema schema) -> + JavaBeanUtils.getGetters(targetClass, schema, new GetterTypeSupplier()); } @Override @@ -59,6 +102,15 @@ public class JavaBeanSchema extends GetterBasedSchemaProvider { @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { - return new JavaBeanTypeInformationFactory(); + return (Class<?> targetClass, Schema schema) -> + JavaBeanUtils.getFieldTypes(targetClass, schema, new GetterTypeSupplier()); + } + + /** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */ + public static class JavaBeanSetterFactory implements FieldValueSetterFactory { + @Override + public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) { + return JavaBeanUtils.getSetters(targetClass, schema, new SetterTypeSupplier()); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index a7d6a30..1504717 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -17,11 +17,16 @@ */ package org.apache.beam.sdk.schemas; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier; import org.apache.beam.sdk.schemas.utils.POJOUtils; -import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory; -import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory; +import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -39,6 +44,25 @@ import org.apache.beam.sdk.values.TypeDescriptor; */ @Experimental(Kind.SCHEMAS) public class JavaFieldSchema extends GetterBasedSchemaProvider { + /** {@link FieldValueTypeSupplier} that's based on public fields. */ + @VisibleForTesting + public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = + ReflectUtils.getFields(clazz) + .stream() + .map(FieldValueTypeInformation::forField) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } + } + @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType()); @@ -46,16 +70,19 @@ public class JavaFieldSchema extends GetterBasedSchemaProvider { @Override public FieldValueGetterFactory fieldValueGetterFactory() { - return new PojoValueGetterFactory(); + return (Class<?> targetClass, Schema schema) -> + POJOUtils.getGetters(targetClass, schema, new JavaFieldTypeSupplier()); } @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { - return new PojoValueTypeInformationFactory(); + return (Class<?> targetClass, Schema schema) -> + POJOUtils.getFieldTypes(targetClass, schema, new JavaFieldTypeSupplier()); } @Override UserTypeCreatorFactory schemaTypeCreatorFactory() { - return new PojoTypeUserTypeCreatorFactory(); + return (Class<?> targetClass, Schema schema) -> + POJOUtils.getCreator(targetClass, schema, new JavaFieldTypeSupplier()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java deleted file mode 100644 index b227988..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas; - -import org.apache.beam.sdk.schemas.utils.POJOUtils; - -/** Vends constructors for POJOs. */ -class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory { - @Override - public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { - return POJOUtils.getCreator(clazz, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java index c5fae19..6b9b18f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java @@ -27,7 +27,7 @@ public class SchemaUserTypeConstructorCreator implements SchemaUserTypeCreator { private final Class<?> clazz; private final transient Constructor<?> constructor; - SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor) { + public SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor) { this.clazz = clazz; this.constructor = checkNotNull(constructor); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java deleted file mode 100644 index 6b76fa6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A {@link FieldValueTypeInformation} for AVRO-generated specific records. */ -public class AvroSpecificRecordTypeInformationFactory implements FieldValueTypeInformationFactory { - @Override - public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return AvroUtils.getFieldTypes((Class<? extends SpecificRecord>) targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index df33d64..c2e737f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -23,6 +23,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.CaseFormat; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -42,6 +43,8 @@ import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.reflect.AvroIgnore; +import org.apache.avro.reflect.AvroName; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecord; @@ -56,6 +59,7 @@ import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Instant; import org.joda.time.ReadableInstant; @@ -117,6 +121,11 @@ public class AvroUtils { this.size = size; } + /** Create a {@link FixedBytesField} with the specified size. */ + public static FixedBytesField withSize(int size) { + return new FixedBytesField(size); + } + /** Create a {@link FixedBytesField} from a Beam {@link FieldType}. */ @Nullable public static FixedBytesField fromBeamFieldType(FieldType fieldType) { @@ -260,59 +269,100 @@ public class AvroUtils { return g -> toGenericRecord(g, avroSchema); } - /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) { - try { - org.apache.avro.Schema avroSchema = - (org.apache.avro.Schema) (clazz.getDeclaredField("SCHEMA$").get(null)); - return toBeamSchema(avroSchema); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new IllegalArgumentException( - "Class " - + clazz - + " is not an AVRO SpecificRecord. " - + "No public SCHEMA$ field was found."); - } + /** Infer a {@link Schema} from either an AVRO-generated SpecificRecord or a POJO. */ + public static <T> Schema getSchema(Class<T> clazz) { + return toBeamSchema(ReflectData.get().getSchema(clazz)); } - private static final class AvroSpecificRecordFieldNamePolicy - implements SerializableFunction<String, String> { - Schema schema; - Map<String, String> nameMapping = Maps.newHashMap(); + private static final class AvroSpecificRecordFieldValueTypeSupplier + implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, String> mapping = getMapping(schema); + Map<String, FieldValueTypeInformation> types = Maps.newHashMap(); + for (Method method : ReflectUtils.getMethods(clazz)) { + if (ReflectUtils.isGetter(method)) { + FieldValueTypeInformation fieldValueTypeInformation = + FieldValueTypeInformation.forGetter(method); + String name = mapping.get(fieldValueTypeInformation.getName()); + if (name != null) { + types.put(name, fieldValueTypeInformation.withName(name)); + } + } + } + + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } - AvroSpecificRecordFieldNamePolicy(Schema schema) { - this.schema = schema; + private Map<String, String> getMapping(Schema schema) { + Map<String, String> mapping = Maps.newHashMap(); for (Field field : schema.getFields()) { - String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); - nameMapping.put(getter, field.getName()); + String underscore = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); + mapping.put(underscore, field.getName()); // The Avro compiler might add a $ at the end of a getter to disambiguate. - nameMapping.put(getter + "$", field.getName()); + mapping.put(underscore + "$", field.getName()); + // If the field is in camel case already, then it's the identity mapping. + mapping.put(field.getName(), field.getName()); } + return mapping; } + } + private static final class AvroPojoFieldValueTypeSupplier implements FieldValueTypeSupplier { @Override - public String apply(String input) { - return nameMapping.getOrDefault(input, input); + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = Maps.newHashMap(); + for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) { + if (!f.isAnnotationPresent(AvroIgnore.class)) { + FieldValueTypeInformation typeInformation = FieldValueTypeInformation.forField(f); + AvroName avroname = f.getAnnotation(AvroName.class); + if (avroname != null) { + typeInformation = typeInformation.withName(avroname.value()); + } + types.put(typeInformation.getName(), typeInformation); + } + } + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); } } - /** Get field types for an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> List<FieldValueTypeInformation> getFieldTypes( - Class<T> clazz, Schema schema) { - return JavaBeanUtils.getFieldTypes( - clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema)); + /** Get field types for an AVRO-generated SpecificRecord or a POJO. */ + public static <T> List<FieldValueTypeInformation> getFieldTypes(Class<T> clazz, Schema schema) { + if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return JavaBeanUtils.getFieldTypes( + clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier()); + } else { + return POJOUtils.getFieldTypes(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + } } - /** Get generated getters for an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> List<FieldValueGetter> getGetters( - Class<T> clazz, Schema schema) { - return JavaBeanUtils.getGetters(clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema)); + /** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */ + public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema schema) { + if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return JavaBeanUtils.getGetters( + clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier()); + } else { + return POJOUtils.getGetters(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + } } /** Get an object creator for an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> SchemaUserTypeCreator getCreator( - Class<T> clazz, Schema schema) { - return AvroByteBuddyUtils.getCreator(clazz, schema); + public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema schema) { + if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return AvroByteBuddyUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema); + } else { + return POJOUtils.getCreator(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + } } /** Converts AVRO schema to Beam field. */ @@ -480,7 +530,6 @@ public class AvroUtils { private static Object genericFromBeamField( Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object value) { TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema); - if (!fieldType.getNullable().equals(typeWithNullability.nullable)) { throw new IllegalArgumentException( "FieldType " diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java similarity index 68% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java index 84f9a5e..12330ec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java @@ -17,15 +17,19 @@ */ package org.apache.beam.sdk.schemas.utils; +import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; import org.apache.beam.sdk.schemas.Schema; -/** A {@link FieldValueTypeInformationFactory} for POJO objects objects. */ -public class PojoValueTypeInformationFactory implements FieldValueTypeInformationFactory { - @Override - public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return POJOUtils.getFieldTypes(targetClass, schema); - } +/** + * A naming policy for schema fields. This maps a name from the class (field name or getter name) to + * the matching field name in the schema. + */ +public interface FieldValueTypeSupplier extends Serializable { + /** + * Return all the FieldValueTypeInformations. The returned list must be in the same order as + * fields in the schema. + */ + List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java deleted file mode 100644 index 5c3d2ea..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueGetter; -import org.apache.beam.sdk.schemas.FieldValueGetterFactory; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.SerializableFunctions; - -/** A factory for creating {@link FieldValueGetter} objects for a JavaBean object. */ -public class JavaBeanGetterFactory implements FieldValueGetterFactory { - @Override - public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getGetters(targetClass, schema, SerializableFunctions.identity()); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java deleted file mode 100644 index e67a58d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueSetter; -import org.apache.beam.sdk.schemas.FieldValueSetterFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */ -public class JavaBeanSetterFactory implements FieldValueSetterFactory { - @Override - public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getSetters(targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java deleted file mode 100644 index f445a87..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.SerializableFunctions; - -/** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */ -public class JavaBeanTypeInformationFactory implements FieldValueTypeInformationFactory { - @Override - public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getFieldTypes(targetClass, schema, SerializableFunctions.identity()); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java index 0ec07c9..58cc0ed 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.schemas.utils; import com.google.common.collect.Maps; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; @@ -48,55 +47,46 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter; import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; -import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.common.ReflectHelpers; /** A set of utilities to generate getter and setter classes for JavaBean objects. */ @Experimental(Kind.SCHEMAS) public class JavaBeanUtils { /** Create a {@link Schema} for a Java Bean class. */ - public static Schema schemaFromJavaBeanClass( - Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) { - return StaticSchemaInference.schemaFromClass( - clazz, c -> JavaBeanUtils.typeInformationFromClass(c, fieldNamePolicy)); + public static Schema schemaFromJavaBeanClass(Class<?> clazz) { + return StaticSchemaInference.schemaFromClass(clazz, JavaBeanUtils::typeInformationFromClass); } - private static List<TypeInformation> typeInformationFromClass( - Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) { - try { - List<TypeInformation> getterTypes = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isGetter) - .map(m -> TypeInformation.forGetter(m, fieldNamePolicy)) - .collect(Collectors.toList()); + private static List<FieldValueTypeInformation> typeInformationFromClass(Class<?> clazz) { + List<FieldValueTypeInformation> getterTypes = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isGetter) + .map(FieldValueTypeInformation::forGetter) + .collect(Collectors.toList()); - Map<String, TypeInformation> setterTypes = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isSetter) - .map(m -> TypeInformation.forSetter(m)) - .collect(Collectors.toMap(TypeInformation::getName, Function.identity())); - validateJavaBean(getterTypes, setterTypes); - return getterTypes; - } catch (IOException e) { - throw new RuntimeException(e); - } + Map<String, FieldValueTypeInformation> setterTypes = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isSetter) + .map(FieldValueTypeInformation::forSetter) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + validateJavaBean(getterTypes, setterTypes); + return getterTypes; } // Make sure that there are matching setters and getters. private static void validateJavaBean( - List<TypeInformation> getters, Map<String, TypeInformation> setters) { - for (TypeInformation type : getters) { - TypeInformation setterType = setters.get(type.getName()); + List<FieldValueTypeInformation> getters, Map<String, FieldValueTypeInformation> setters) { + for (FieldValueTypeInformation type : getters) { + FieldValueTypeInformation setterType = setters.get(type.getName()); if (setterType == null) { throw new RuntimeException( "JavaBean contained a getter for field " + type.getName() + "but did not contain a matching setter."); } - if (!type.equals(setterType)) { + if (!type.getType().equals(setterType.getType())) { throw new RuntimeException( "JavaBean contained setter for field " + type.getName() @@ -118,28 +108,9 @@ public class JavaBeanUtils { Maps.newConcurrentMap(); public static List<FieldValueTypeInformation> getFieldTypes( - Class<?> clazz, Schema schema, SerializableFunction<String, String> fieldNamePolicy) { + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_FIELD_TYPES.computeIfAbsent( - new ClassWithSchema(clazz, schema), - c -> { - try { - Map<String, FieldValueTypeInformation> getterMap = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isGetter) - .map(m -> TypeInformation.forGetter(m, fieldNamePolicy)) - .map(FieldValueTypeInformation::of) - .collect( - Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> getterMap.get(f.getName())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema)); } // The list of getters for a class is cached, so we only create the classes the first time @@ -153,37 +124,22 @@ public class JavaBeanUtils { * <p>The returned list is ordered by the order of fields in the schema. */ public static List<FieldValueGetter> getGetters( - Class<?> clazz, Schema schema, SerializableFunction<String, String> fieldNamePolicy) { + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_GETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - try { - Map<String, FieldValueGetter> getterMap = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isGetter) - .map(m -> JavaBeanUtils.createGetter(m, fieldNamePolicy)) - .collect(Collectors.toMap(FieldValueGetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> getterMap.get(f.getName())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return types.stream().map(JavaBeanUtils::createGetter).collect(Collectors.toList()); }); } - private static <T> FieldValueGetter createGetter( - Method getterMethod, SerializableFunction<String, String> fieldNamePolicy) { - TypeInformation typeInformation = TypeInformation.forGetter(getterMethod, fieldNamePolicy); + private static <T> FieldValueGetter createGetter(FieldValueTypeInformation typeInformation) { DynamicType.Builder<FieldValueGetter> builder = ByteBuddyUtils.subclassGetterInterface( BYTE_BUDDY, - getterMethod.getDeclaringClass(), + typeInformation.getMethod().getDeclaringClass(), new ConvertType(false).convert(typeInformation.getType())); - builder = implementGetterMethods(builder, getterMethod, fieldNamePolicy); + builder = implementGetterMethods(builder, typeInformation); try { return builder .make() @@ -195,20 +151,18 @@ public class JavaBeanUtils { | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new RuntimeException("Unable to generate a getter for getter '" + getterMethod + "'"); + throw new RuntimeException( + "Unable to generate a getter for getter '" + typeInformation.getMethod() + "'"); } } private static DynamicType.Builder<FieldValueGetter> implementGetterMethods( - DynamicType.Builder<FieldValueGetter> builder, - Method method, - SerializableFunction<String, String> fieldNamePolicy) { - TypeInformation typeInformation = TypeInformation.forGetter(method, fieldNamePolicy); + DynamicType.Builder<FieldValueGetter> builder, FieldValueTypeInformation typeInformation) { return builder .method(ElementMatchers.named("name")) .intercept(FixedValue.reference(typeInformation.getName())) .method(ElementMatchers.named("get")) - .intercept(new InvokeGetterInstruction(method, fieldNamePolicy)); + .intercept(new InvokeGetterInstruction(typeInformation)); } // The list of setters for a class is cached, so we only create the classes the first time @@ -221,36 +175,23 @@ public class JavaBeanUtils { * * <p>The returned list is ordered by the order of fields in the schema. */ - public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema schema) { + public static List<FieldValueSetter> getSetters( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_SETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - try { - Map<String, FieldValueSetter> setterMap = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isSetter) - .map(JavaBeanUtils::createSetter) - .collect(Collectors.toMap(FieldValueSetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> setterMap.get(f.getName())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return types.stream().map(JavaBeanUtils::createSetter).collect(Collectors.toList()); }); } - private static <T> FieldValueSetter createSetter(Method setterMethod) { - TypeInformation typeInformation = TypeInformation.forSetter(setterMethod); + private static FieldValueSetter createSetter(FieldValueTypeInformation typeInformation) { DynamicType.Builder<FieldValueSetter> builder = ByteBuddyUtils.subclassSetterInterface( BYTE_BUDDY, - setterMethod.getDeclaringClass(), + typeInformation.getMethod().getDeclaringClass(), new ConvertType(false).convert(typeInformation.getType())); - builder = implementSetterMethods(builder, setterMethod); + builder = implementSetterMethods(builder, typeInformation.getMethod()); try { return builder .make() @@ -262,29 +203,27 @@ public class JavaBeanUtils { | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new RuntimeException("Unable to generate a setter for setter '" + setterMethod + "'"); + throw new RuntimeException( + "Unable to generate a setter for setter '" + typeInformation.getMethod() + "'"); } } private static DynamicType.Builder<FieldValueSetter> implementSetterMethods( DynamicType.Builder<FieldValueSetter> builder, Method method) { - TypeInformation typeInformation = TypeInformation.forSetter(method); + FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method); return builder .method(ElementMatchers.named("name")) - .intercept(FixedValue.reference(typeInformation.getName())) + .intercept(FixedValue.reference(javaTypeInformation.getName())) .method(ElementMatchers.named("set")) .intercept(new InvokeSetterInstruction(method)); } // Implements a method to read a public getter out of an object. private static class InvokeGetterInstruction implements Implementation { - // Getter method that wil be invoked - private Method method; - private SerializableFunction<String, String> fieldNamePolicy; + private final FieldValueTypeInformation typeInformation; - InvokeGetterInstruction(Method method, SerializableFunction<String, String> fieldNamePolicy) { - this.method = method; - this.fieldNamePolicy = fieldNamePolicy; + InvokeGetterInstruction(FieldValueTypeInformation typeInformation) { + this.typeInformation = typeInformation; } @Override @@ -295,7 +234,6 @@ public class JavaBeanUtils { @Override public ByteCodeAppender appender(final Target implementationTarget) { return (methodVisitor, implementationContext, instrumentedMethod) -> { - TypeInformation typeInformation = TypeInformation.forGetter(method, fieldNamePolicy); // this + method parameters. int numLocals = 1 + instrumentedMethod.getParameters().size(); @@ -305,7 +243,7 @@ public class JavaBeanUtils { // Method param is offset 1 (offset 0 is the this parameter). MethodVariableAccess.REFERENCE.loadFrom(1), // Invoke the getter - MethodInvocation.invoke(new ForLoadedMethod(method))); + MethodInvocation.invoke(new ForLoadedMethod(typeInformation.getMethod()))); StackManipulation stackManipulation = new StackManipulation.Compound( @@ -335,7 +273,7 @@ public class JavaBeanUtils { @Override public ByteCodeAppender appender(final Target implementationTarget) { return (methodVisitor, implementationContext, instrumentedMethod) -> { - TypeInformation typeInformation = TypeInformation.forSetter(method); + FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method); // this + method parameters. int numLocals = 1 + instrumentedMethod.getParameters().size(); @@ -349,7 +287,7 @@ public class JavaBeanUtils { MethodVariableAccess.REFERENCE.loadFrom(1), // Do any conversions necessary. new ByteBuddyUtils.ConvertValueForSetter(readField) - .convert(typeInformation.getType()), + .convert(javaTypeInformation.getType()), // Now update the field and return void. MethodInvocation.invoke(new ForLoadedMethod(method)), MethodReturn.VOID); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java index 38f5307..370b3cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import net.bytebuddy.ByteBuddy; import net.bytebuddy.description.field.FieldDescription.ForLoadedField; import net.bytebuddy.description.type.TypeDescription.ForLoadedType; @@ -55,7 +56,6 @@ import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter; import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; -import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.TypeDescriptor; @@ -63,12 +63,11 @@ import org.apache.beam.sdk.values.TypeDescriptor; @Experimental(Kind.SCHEMAS) public class POJOUtils { public static Schema schemaFromPojoClass(Class<?> clazz) { - // We should cache the field order. - Function<Class, List<TypeInformation>> getTypesForClass = + Function<Class, List<FieldValueTypeInformation>> getTypesForClass = c -> ReflectUtils.getFields(c) .stream() - .map(TypeInformation::forField) + .map(FieldValueTypeInformation::forField) .collect(Collectors.toList()); return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass); } @@ -79,22 +78,10 @@ public class POJOUtils { private static final Map<ClassWithSchema, List<FieldValueTypeInformation>> CACHED_FIELD_TYPES = Maps.newConcurrentMap(); - public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz, Schema schema) { + public static List<FieldValueTypeInformation> getFieldTypes( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_FIELD_TYPES.computeIfAbsent( - new ClassWithSchema(clazz, schema), - c -> { - Map<String, FieldValueTypeInformation> typeInformationMap = - ReflectUtils.getFields(clazz) - .stream() - .map(FieldValueTypeInformation::of) - .collect( - Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> typeInformationMap.get(f.getName())) - .collect(Collectors.toList()); - }); + new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema)); } // The list of getters for a class is cached, so we only create the classes the first time @@ -102,21 +89,20 @@ public class POJOUtils { private static final Map<ClassWithSchema, List<FieldValueGetter>> CACHED_GETTERS = Maps.newConcurrentMap(); - public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) { + public static List<FieldValueGetter> getGetters( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { // Return the getters ordered by their position in the schema. return CACHED_GETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - Map<String, FieldValueGetter> getterMap = - ReflectUtils.getFields(clazz) - .stream() - .map(POJOUtils::createGetter) - .collect(Collectors.toMap(FieldValueGetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> getterMap.get(f.getName())) - .collect(Collectors.toList()); + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + List<FieldValueGetter> getters = + types.stream().map(POJOUtils::createGetter).collect(Collectors.toList()); + if (getters.size() != schema.getFieldCount()) { + throw new RuntimeException( + "Was not able to generate getters for schema: " + schema + " class: " + clazz); + } + return getters; }); } @@ -125,24 +111,21 @@ public class POJOUtils { public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS = Maps.newConcurrentMap(); - public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema schema) { + public static <T> SchemaUserTypeCreator getCreator( + Class<T> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_CREATORS.computeIfAbsent( - new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema)); + new ClassWithSchema(clazz, schema), + c -> { + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return createCreator(clazz, schema, types); + }); } - private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) { + private static <T> SchemaUserTypeCreator createCreator( + Class<T> clazz, Schema schema, List<FieldValueTypeInformation> types) { // Get the list of class fields ordered by schema. - Map<String, Field> fieldMap = - ReflectUtils.getFields(clazz) - .stream() - .collect(Collectors.toMap(Field::getName, Function.identity())); List<Field> fields = - schema - .getFields() - .stream() - .map(f -> fieldMap.get(f.getName())) - .collect(Collectors.toList()); - + types.stream().map(FieldValueTypeInformation::getField).collect(Collectors.toList()); try { DynamicType.Builder<SchemaUserTypeCreator> builder = BYTE_BUDDY @@ -179,13 +162,16 @@ public class POJOUtils { * </code></pre> */ @SuppressWarnings("unchecked") - static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter(Field field) { + @Nullable + static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter( + FieldValueTypeInformation typeInformation) { + Field field = typeInformation.getField(); DynamicType.Builder<FieldValueGetter> builder = ByteBuddyUtils.subclassGetterInterface( BYTE_BUDDY, field.getDeclaringClass(), new ConvertType(false).convert(TypeDescriptor.of(field.getType()))); - builder = implementGetterMethods(builder, field); + builder = implementGetterMethods(builder, field, typeInformation.getName()); try { return builder .make() @@ -202,10 +188,10 @@ public class POJOUtils { } private static DynamicType.Builder<FieldValueGetter> implementGetterMethods( - DynamicType.Builder<FieldValueGetter> builder, Field field) { + DynamicType.Builder<FieldValueGetter> builder, Field field, String name) { return builder .method(ElementMatchers.named("name")) - .intercept(FixedValue.reference(field.getName())) + .intercept(FixedValue.reference(name)) .method(ElementMatchers.named("get")) .intercept(new ReadFieldInstruction(field)); } @@ -215,21 +201,14 @@ public class POJOUtils { private static final Map<ClassWithSchema, List<FieldValueSetter>> CACHED_SETTERS = Maps.newConcurrentMap(); - public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema schema) { + public static List<FieldValueSetter> getSetters( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { // Return the setters, ordered by their position in the schema. return CACHED_SETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - Map<String, FieldValueSetter> setterMap = - ReflectUtils.getFields(clazz) - .stream() - .map(POJOUtils::createSetter) - .collect(Collectors.toMap(FieldValueSetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> setterMap.get(f.getName())) - .collect(Collectors.toList()); + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return types.stream().map(POJOUtils::createSetter).collect(Collectors.toList()); }); } @@ -250,7 +229,9 @@ public class POJOUtils { * </code></pre> */ @SuppressWarnings("unchecked") - private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> createSetter(Field field) { + private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> createSetter( + FieldValueTypeInformation typeInformation) { + Field field = typeInformation.getField(); DynamicType.Builder<FieldValueSetter> builder = ByteBuddyUtils.subclassSetterInterface( BYTE_BUDDY, @@ -284,7 +265,7 @@ public class POJOUtils { // Implements a method to read a public field out of an object. static class ReadFieldInstruction implements Implementation { // Field that will be read. - private Field field; + private final Field field; ReadFieldInstruction(Field field) { this.field = field; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java deleted file mode 100644 index 275b791..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueGetter; -import org.apache.beam.sdk.schemas.FieldValueGetterFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A factory for creating {@link FieldValueGetter} objects for a POJO. */ -public class PojoValueGetterFactory implements FieldValueGetterFactory { - @Override - public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return POJOUtils.getGetters(targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java deleted file mode 100644 index 5e9447a..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueSetter; -import org.apache.beam.sdk.schemas.FieldValueSetterFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A factory for creating {@link FieldValueSetter} objects for a POJO. */ -public class PojoValueSetterFactory implements FieldValueSetterFactory { - @Override - public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) { - return POJOUtils.getSetters(targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java index 8619ea5..c9d943b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -40,7 +39,7 @@ public class ReflectUtils { private final Class clazz; private final Schema schema; - public ClassWithSchema(Class clazz, Schema schema) { + ClassWithSchema(Class clazz, Schema schema) { this.clazz = clazz; this.schema = schema; } @@ -67,7 +66,7 @@ public class ReflectUtils { private static final Map<Class, List<Field>> DECLARED_FIELDS = Maps.newHashMap(); /** Returns the list of public, non-static methods in the class, caching the results. */ - static List<Method> getMethods(Class clazz) throws IOException { + public static List<Method> getMethods(Class clazz) { return DECLARED_METHODS.computeIfAbsent( clazz, c -> { @@ -79,7 +78,7 @@ public class ReflectUtils { } // Get all public, non-static, non-transient fields. - static List<Field> getFields(Class<?> clazz) { + public static List<Field> getFields(Class<?> clazz) { return DECLARED_FIELDS.computeIfAbsent( clazz, c -> { @@ -104,7 +103,7 @@ public class ReflectUtils { }); } - static boolean isGetter(Method method) { + public static boolean isGetter(Method method) { if (Void.TYPE.equals(method.getReturnType())) { return false; } @@ -118,13 +117,13 @@ public class ReflectUtils { || Boolean.class.equals(method.getReturnType()))); } - static boolean isSetter(Method method) { + public static boolean isSetter(Method method) { return Void.TYPE.equals(method.getReturnType()) && method.getParameterCount() == 1 && method.getName().startsWith("set"); } - static String stripPrefix(String methodName, String prefix) { + public static String stripPrefix(String methodName, String prefix) { String firstLetter = methodName.substring(prefix.length(), prefix.length() + 1).toLowerCase(); return (methodName.length() == prefix.length() + 1) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index fdf978c..d3f81e8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -20,21 +20,16 @@ package org.apache.beam.sdk.schemas.utils; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.Function; -import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.ReadableInstant; @@ -64,94 +59,6 @@ public class StaticSchemaInference { .put(BigDecimal.class, FieldType.DECIMAL) .build(); - /** Relevant information about a Java type. */ - public static class TypeInformation { - private final String name; - private final TypeDescriptor type; - private final boolean nullable; - - /** Construct a {@link TypeInformation}. */ - private TypeInformation(String name, TypeDescriptor type, boolean nullable) { - this.name = name; - this.type = type; - this.nullable = nullable; - } - - /** Construct a {@link TypeInformation} from a class member variable. */ - public static TypeInformation forField(Field field) { - return new TypeInformation( - field.getName(), - TypeDescriptor.of(field.getGenericType()), - field.isAnnotationPresent(Nullable.class)); - } - - /** Construct a {@link TypeInformation} from a class getter. */ - public static TypeInformation forGetter( - Method method, SerializableFunction<String, String> fieldNamePolicy) { - String name; - if (method.getName().startsWith("get")) { - name = ReflectUtils.stripPrefix(method.getName(), "get"); - } else if (method.getName().startsWith("is")) { - name = ReflectUtils.stripPrefix(method.getName(), "is"); - } else { - throw new RuntimeException("Getter has wrong prefix " + method.getName()); - } - name = fieldNamePolicy.apply(name); - - TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType()); - boolean nullable = method.isAnnotationPresent(Nullable.class); - return new TypeInformation(name, type, nullable); - } - - /** Construct a {@link TypeInformation} from a class setter. */ - public static TypeInformation forSetter(Method method) { - String name; - if (method.getName().startsWith("set")) { - name = ReflectUtils.stripPrefix(method.getName(), "set"); - } else { - throw new RuntimeException("Setter has wrong prefix " + method.getName()); - } - if (method.getParameterCount() != 1) { - throw new RuntimeException("Setter methods should take a single argument."); - } - TypeDescriptor type = TypeDescriptor.of(method.getGenericParameterTypes()[0]); - boolean nullable = - Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance); - return new TypeInformation(name, type, nullable); - } - - public String getName() { - return name; - } - - public TypeDescriptor getType() { - return type; - } - - public boolean isNullable() { - return nullable; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TypeInformation that = (TypeInformation) o; - return nullable == that.nullable - && Objects.equals(name, that.name) - && Objects.equals(type, that.type); - } - - @Override - public int hashCode() { - return Objects.hash(name, type, nullable); - } - } - /** * Infer a schema from a Java class. * @@ -160,9 +67,9 @@ public class StaticSchemaInference { * public getter methods, or special annotations on the class. */ public static Schema schemaFromClass( - Class<?> clazz, Function<Class, List<TypeInformation>> getTypesForClass) { + Class<?> clazz, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) { Schema.Builder builder = Schema.builder(); - for (TypeInformation type : getTypesForClass.apply(clazz)) { + for (FieldValueTypeInformation type : getTypesForClass.apply(clazz)) { Schema.FieldType fieldType = fieldFromType(type.getType(), getTypesForClass); if (type.isNullable()) { builder.addNullableField(type.getName(), fieldType); @@ -175,7 +82,7 @@ public class StaticSchemaInference { // Map a Java field type to a Beam Schema FieldType. private static Schema.FieldType fieldFromType( - TypeDescriptor type, Function<Class, List<TypeInformation>> getTypesForClass) { + TypeDescriptor type, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) { FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType()); if (primitiveType != null) { return primitiveType; @@ -209,7 +116,8 @@ public class StaticSchemaInference { FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass); FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), getTypesForClass); checkArgument( - keyType.getTypeName().isPrimitiveType(), "Only primitive types can be map keys"); + keyType.getTypeName().isPrimitiveType(), + "Only primitive types can be map keys. type: " + keyType.getTypeName()); return FieldType.map(keyType, valueType); } else { throw new RuntimeException("Cannot infer schema from unparameterized map."); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java index f7cc64d..3c22f4b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java @@ -22,12 +22,20 @@ import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.reflect.AvroIgnore; +import org.apache.avro.reflect.AvroName; +import org.apache.avro.reflect.AvroSchema; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.utils.AvroUtils.FixedBytesField; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; @@ -36,6 +44,142 @@ import org.junit.Test; /** Tests for AVRO schema classes. */ public class AvroSchemaTest { + /** A test POJO that corresponds to our AVRO schema. */ + public static class AvroSubPojo { + @AvroName("bool_non_nullable") + public boolean boolNonNullable; + + @AvroName("int") + @org.apache.avro.reflect.Nullable + public Integer anInt; + + public AvroSubPojo(boolean boolNonNullable, Integer anInt) { + this.boolNonNullable = boolNonNullable; + this.anInt = anInt; + } + + public AvroSubPojo() {} + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AvroSubPojo)) { + return false; + } + AvroSubPojo that = (AvroSubPojo) o; + return boolNonNullable == that.boolNonNullable && Objects.equals(anInt, that.anInt); + } + + @Override + public int hashCode() { + return Objects.hash(boolNonNullable, anInt); + } + + @Override + public String toString() { + return "AvroSubPojo{" + "boolNonNullable=" + boolNonNullable + ", anInt=" + anInt + '}'; + } + } + + /** A test POJO that corresponds to our AVRO schema. */ + public static class AvroPojo { + public @AvroName("bool_non_nullable") boolean boolNonNullable; + + @org.apache.avro.reflect.Nullable + public @AvroName("int") Integer anInt; + + @org.apache.avro.reflect.Nullable + public @AvroName("long") Long aLong; + + @AvroName("float") + @org.apache.avro.reflect.Nullable + public Float aFloat; + + @AvroName("double") + @org.apache.avro.reflect.Nullable + public Double aDouble; + + @org.apache.avro.reflect.Nullable public String string; + @org.apache.avro.reflect.Nullable public ByteBuffer bytes; + + @AvroSchema("{\"type\": \"fixed\", \"size\": 4, \"name\": \"fixed4\"}") + @org.apache.avro.reflect.Nullable + public byte[] fixed; + + @org.apache.avro.reflect.Nullable public AvroSubPojo row; + @org.apache.avro.reflect.Nullable public List<AvroSubPojo> array; + @org.apache.avro.reflect.Nullable public Map<String, AvroSubPojo> map; + @AvroIgnore String extraField; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AvroPojo)) { + return false; + } + AvroPojo avroPojo = (AvroPojo) o; + return boolNonNullable == avroPojo.boolNonNullable + && Objects.equals(anInt, avroPojo.anInt) + && Objects.equals(aLong, avroPojo.aLong) + && Objects.equals(aFloat, avroPojo.aFloat) + && Objects.equals(aDouble, avroPojo.aDouble) + && Objects.equals(string, avroPojo.string) + && Objects.equals(bytes, avroPojo.bytes) + && Arrays.equals(fixed, avroPojo.fixed) + && Objects.equals(row, avroPojo.row) + && Objects.equals(array, avroPojo.array) + && Objects.equals(map, avroPojo.map); + } + + @Override + public int hashCode() { + return Objects.hash( + boolNonNullable, + anInt, + aLong, + aFloat, + aDouble, + string, + bytes, + Arrays.hashCode(fixed), + row, + array, + map); + } + + public AvroPojo( + boolean boolNonNullable, + int anInt, + long aLong, + float aFloat, + double aDouble, + String string, + ByteBuffer bytes, + byte[] fixed, + AvroSubPojo row, + List<AvroSubPojo> array, + Map<String, AvroSubPojo> map) { + this.boolNonNullable = boolNonNullable; + this.anInt = anInt; + this.aLong = aLong; + this.aFloat = aFloat; + this.aDouble = aDouble; + this.string = string; + this.bytes = bytes; + this.fixed = fixed; + this.row = row; + this.array = array; + this.map = map; + this.extraField = ""; + } + + public AvroPojo() {} + } + private static final Schema SUBSCHEMA = Schema.builder() .addField("bool_non_nullable", FieldType.BOOLEAN) @@ -52,18 +196,27 @@ public class AvroSchemaTest { .addNullableField("double", FieldType.DOUBLE) .addNullableField("string", FieldType.STRING) .addNullableField("bytes", FieldType.BYTES) - .addField("fixed", FieldType.BYTES.withMetadata("FIXED:4")) + .addField("fixed", FixedBytesField.withSize(4).toBeamType()) .addNullableField("timestampMillis", FieldType.DATETIME) .addNullableField("row", SUB_TYPE) .addNullableField("array", FieldType.array(SUB_TYPE)) .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE)) .build(); - @Test - public void testSpecificRecordSchema() { - assertEquals( - SCHEMA, new AvroSpecificRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class))); - } + private static final Schema POJO_SCHEMA = + Schema.builder() + .addField("bool_non_nullable", FieldType.BOOLEAN) + .addNullableField("int", FieldType.INT32) + .addNullableField("long", FieldType.INT64) + .addNullableField("float", FieldType.FLOAT) + .addNullableField("double", FieldType.DOUBLE) + .addNullableField("string", FieldType.STRING) + .addNullableField("bytes", FieldType.BYTES) + .addField("fixed", FixedBytesField.withSize(4).toBeamType()) + .addNullableField("row", SUB_TYPE) + .addNullableField("array", FieldType.array(SUB_TYPE.withNullable(false))) + .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE.withNullable(false))) + .build(); private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4}; private static final DateTime DATE_TIME = @@ -131,17 +284,26 @@ public class AvroSchemaTest { .build(); @Test + public void testSpecificRecordSchema() { + assertEquals(SCHEMA, new AvroRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class))); + } + + @Test + public void testPojoSchema() { + assertEquals(POJO_SCHEMA, AvroUtils.getSchema(AvroPojo.class)); + } + + @Test public void testSpecificRecordToRow() { SerializableFunction<TestAvro, Row> toRow = - new AvroSpecificRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class)); - Row row = toRow.apply(AVRO_SPECIFIC_RECORD); + new AvroRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class)); assertEquals(ROW, toRow.apply(AVRO_SPECIFIC_RECORD)); } @Test public void testRowToSpecificRecord() { SerializableFunction<Row, TestAvro> fromRow = - new AvroSpecificRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class)); + new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class)); assertEquals(AVRO_SPECIFIC_RECORD, fromRow.apply(ROW)); } @@ -156,7 +318,51 @@ public class AvroSchemaTest { public void testRowToGenericRecord() { SerializableFunction<Row, GenericRecord> fromRow = AvroUtils.getRowToGenericRecordFunction(TestAvro.SCHEMA$); - GenericRecord generic = fromRow.apply(ROW); assertEquals(AVRO_GENERIC_RECORD, fromRow.apply(ROW)); } + + private static final AvroSubPojo SUB_POJO = new AvroSubPojo(true, 42); + private static final AvroPojo AVRO_POJO = + new AvroPojo( + true, + 43, + 44L, + (float) 44.1, + (double) 44.2, + "mystring", + ByteBuffer.wrap(BYTE_ARRAY), + BYTE_ARRAY, + SUB_POJO, + ImmutableList.of(SUB_POJO, SUB_POJO), + ImmutableMap.of("k1", SUB_POJO, "k2", SUB_POJO)); + + private static final Row ROW_FOR_POJO = + Row.withSchema(POJO_SCHEMA) + .addValues( + true, + 43, + 44L, + (float) 44.1, + (double) 44.2, + "mystring", + ByteBuffer.wrap(BYTE_ARRAY), + ByteBuffer.wrap(BYTE_ARRAY), + NESTED_ROW, + ImmutableList.of(NESTED_ROW, NESTED_ROW), + ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW)) + .build(); + + @Test + public void testPojoRecordToRow() { + SerializableFunction<AvroPojo, Row> toRow = + new AvroRecordSchema().toRowFunction(TypeDescriptor.of(AvroPojo.class)); + assertEquals(ROW_FOR_POJO, toRow.apply(AVRO_POJO)); + } + + @Test + public void testRowToPojo() { + SerializableFunction<Row, AvroPojo> fromRow = + new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(AvroPojo.class)); + assertEquals(AVRO_POJO, fromRow.apply(ROW_FOR_POJO)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java index 9380899..c835ea6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java @@ -37,6 +37,8 @@ import java.nio.charset.Charset; import java.util.List; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueSetter; +import org.apache.beam.sdk.schemas.JavaBeanSchema; +import org.apache.beam.sdk.schemas.JavaBeanSchema.SetterTypeSupplier; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray; @@ -49,7 +51,6 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.NullableBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; @@ -61,8 +62,7 @@ public class JavaBeanUtilsTest { @Test public void testNullable() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class); assertTrue(schema.getField("str").getType().getNullable()); assertFalse(schema.getField("anInt").getType().getNullable()); } @@ -70,62 +70,48 @@ public class JavaBeanUtilsTest { @Test public void testMismatchingNullable() { thrown.expect(RuntimeException.class); - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - MismatchingNullableBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class); } @Test public void testSimpleBean() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class); SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); } @Test public void testNestedBean() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema); } @Test public void testPrimitiveArray() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - PrimitiveArrayBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedArray() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - NestedArrayBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedCollection() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - NestedCollectionBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema); } @Test public void testPrimitiveMap() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - PrimitiveMapBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema); } @Test public void testNestedMap() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - NestedMapBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema); } @@ -147,7 +133,7 @@ public class JavaBeanUtilsTest { List<FieldValueGetter> getters = JavaBeanUtils.getGetters( - SimpleBean.class, SIMPLE_BEAN_SCHEMA, SerializableFunctions.identity()); + SimpleBean.class, SIMPLE_BEAN_SCHEMA, new JavaBeanSchema.GetterTypeSupplier()); assertEquals(12, getters.size()); assertEquals("str", getters.get(0).name()); @@ -174,7 +160,8 @@ public class JavaBeanUtilsTest { @Test public void testGeneratedSimpleSetters() { SimpleBean simpleBean = new SimpleBean(); - List<FieldValueSetter> setters = JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA); + List<FieldValueSetter> setters = + JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA, new SetterTypeSupplier()); assertEquals(12, setters.size()); setters.get(0).set(simpleBean, "field1"); @@ -219,7 +206,7 @@ public class JavaBeanUtilsTest { JavaBeanUtils.getGetters( BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA, - SerializableFunctions.identity()); + new JavaBeanSchema.GetterTypeSupplier()); assertEquals((byte) 41, getters.get(0).get(bean)); assertEquals((short) 42, getters.get(1).get(bean)); assertEquals((int) 43, getters.get(2).get(bean)); @@ -231,7 +218,8 @@ public class JavaBeanUtilsTest { public void testGeneratedSimpleBoxedSetters() { BeanWithBoxedFields bean = new BeanWithBoxedFields(); List<FieldValueSetter> setters = - JavaBeanUtils.getSetters(BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA); + JavaBeanUtils.getSetters( + BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA, new SetterTypeSupplier()); setters.get(0).set(bean, (byte) 41); setters.get(1).set(bean, (short) 42); @@ -250,7 +238,8 @@ public class JavaBeanUtilsTest { public void testGeneratedByteBufferSetters() { BeanWithByteArray bean = new BeanWithByteArray(); List<FieldValueSetter> setters = - JavaBeanUtils.getSetters(BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA); + JavaBeanUtils.getSetters( + BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA, new SetterTypeSupplier()); setters.get(0).set(bean, "field1".getBytes(Charset.defaultCharset())); setters.get(1).set(bean, "field2".getBytes(Charset.defaultCharset())); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java index 1a0946d..e140a8b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java @@ -37,6 +37,7 @@ import java.nio.charset.Charset; import java.util.List; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueSetter; +import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO; @@ -126,7 +127,8 @@ public class POJOUtilsTest { new BigDecimal(42), new StringBuilder("stringBuilder")); - List<FieldValueGetter> getters = POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA); + List<FieldValueGetter> getters = + POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier()); assertEquals(12, getters.size()); assertEquals("str", getters.get(0).name()); assertEquals("field1", getters.get(0).get(simplePojo)); @@ -147,7 +149,8 @@ public class POJOUtilsTest { @Test public void testGeneratedSimpleSetters() { SimplePOJO simplePojo = new SimplePOJO(); - List<FieldValueSetter> setters = POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA); + List<FieldValueSetter> setters = + POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier()); assertEquals(12, setters.size()); setters.get(0).set(simplePojo, "field1"); @@ -182,7 +185,8 @@ public class POJOUtilsTest { POJOWithBoxedFields pojo = new POJOWithBoxedFields((byte) 41, (short) 42, 43, 44L, true); List<FieldValueGetter> getters = - POJOUtils.getGetters(POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA); + POJOUtils.getGetters( + POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier()); assertEquals((byte) 41, getters.get(0).get(pojo)); assertEquals((short) 42, getters.get(1).get(pojo)); assertEquals((int) 43, getters.get(2).get(pojo)); @@ -194,7 +198,8 @@ public class POJOUtilsTest { public void testGeneratedSimpleBoxedSetters() { POJOWithBoxedFields pojo = new POJOWithBoxedFields(); List<FieldValueSetter> setters = - POJOUtils.getSetters(POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA); + POJOUtils.getSetters( + POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier()); setters.get(0).set(pojo, (byte) 41); setters.get(1).set(pojo, (short) 42); @@ -213,7 +218,8 @@ public class POJOUtilsTest { public void testGeneratedByteBufferSetters() { POJOWithByteArray pojo = new POJOWithByteArray(); List<FieldValueSetter> setters = - POJOUtils.getSetters(POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA); + POJOUtils.getSetters( + POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, new JavaFieldTypeSupplier()); setters.get(0).set(pojo, BYTE_ARRAY); setters.get(1).set(pojo, BYTE_BUFFER.array());