[ https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=175470&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175470 ]
ASF GitHub Bot logged work on BEAM-4454: ---------------------------------------- Author: ASF GitHub Bot Created on: 14/Dec/18 17:16 Start Date: 14/Dec/18 17:16 Worklog Time Spent: 10m Work Description: reuvenlax closed pull request #7267: [BEAM-4454] Support Avro POJO objects URL: https://github.com/apache/beam/pull/7267 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java similarity index 67% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java index d8e4bda342f8..29bf51a06a77 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java @@ -17,35 +17,34 @@ */ package org.apache.beam.sdk.schemas; -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory; import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** - * A {@link SchemaProvider} for AVRO generated SpecificRecords. + * A {@link SchemaProvider} for AVRO generated SpecificRecords and POJOs. * - * <p>This provider infers a schema from generates SpecificRecord objects, and creates schemas and - * rows that bind to the appropriate fields. + * <p>This provider infers a schema from generated SpecificRecord objects, and creates schemas and + * rows that bind to the appropriate fields. This provider also infers schemas from Java POJO + * objects, creating a schema that matches that inferred by the AVRO libraries. */ -public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider { +public class AvroRecordSchema extends GetterBasedSchemaProvider { @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { - return AvroUtils.getSchema((Class<? extends SpecificRecord>) typeDescriptor.getRawType()); + return AvroUtils.getSchema(typeDescriptor.getRawType()); } @Override public FieldValueGetterFactory fieldValueGetterFactory() { - return new AvroSpecificRecordGetterFactory(); + return AvroUtils::getGetters; } @Override public UserTypeCreatorFactory schemaTypeCreatorFactory() { - return new AvroSpecificRecordUserTypeCreatorFactory(); + return AvroUtils::getCreator; } @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { - return new AvroSpecificRecordTypeInformationFactory(); + return AvroUtils::getFieldTypes; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java deleted file mode 100644 index fcb85f4dd664..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas; - -import java.util.List; -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.utils.AvroUtils; - -/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */ -public class AvroSpecificRecordGetterFactory implements FieldValueGetterFactory { - @Override - public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java deleted file mode 100644 index 68d0d6a7d068..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas; - -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.utils.AvroUtils; - -/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */ -public class AvroSpecificRecordUserTypeCreatorFactory implements UserTypeCreatorFactory { - @Override - public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { - return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java index 5cccdf67ef3d..593853ffef29 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java @@ -22,22 +22,33 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; import java.util.Collection; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; +import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; -/** Represents type information for a schema field. */ +/** Represents type information for a Java type that will be used to infer a Schema type. */ @AutoValue public abstract class FieldValueTypeInformation implements Serializable { /** Returns the field name. */ public abstract String getName(); + /** Returns whether the field is nullable. */ + public abstract boolean isNullable(); + /** Returns the field type. */ - public abstract Class getType(); + public abstract TypeDescriptor getType(); + + @Nullable + public abstract Field getField(); + + @Nullable + public abstract Method getMethod(); /** If the field is a container type, returns the element type. */ @Nullable @@ -51,26 +62,90 @@ @Nullable public abstract Type getMapValueType(); - public static FieldValueTypeInformation of(Field field) { - return new AutoValue_FieldValueTypeInformation( - field.getName(), - field.getType(), - getArrayComponentType(field), - getMapKeyType(field), - getMapValueType(field)); + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + public abstract Builder setName(String name); + + public abstract Builder setNullable(boolean nullable); + + public abstract Builder setType(TypeDescriptor type); + + public abstract Builder setField(@Nullable Field field); + + public abstract Builder setMethod(@Nullable Method method); + + public abstract Builder setElementType(@Nullable Type elementType); + + public abstract Builder setMapKeyType(@Nullable Type mapKeyType); + + public abstract Builder setMapValueType(@Nullable Type mapValueType); + + abstract FieldValueTypeInformation build(); + } + + public static FieldValueTypeInformation forField(Field field) { + return new AutoValue_FieldValueTypeInformation.Builder() + .setName(field.getName()) + .setNullable(field.isAnnotationPresent(Nullable.class)) + .setType(TypeDescriptor.of(field.getGenericType())) + .setField(field) + .setElementType(getArrayComponentType(field)) + .setMapKeyType(getMapKeyType(field)) + .setMapValueType(getMapValueType(field)) + .build(); + } + + public static FieldValueTypeInformation forGetter(Method method) { + String name; + if (method.getName().startsWith("get")) { + name = ReflectUtils.stripPrefix(method.getName(), "get"); + } else if (method.getName().startsWith("is")) { + name = ReflectUtils.stripPrefix(method.getName(), "is"); + } else { + throw new RuntimeException("Getter has wrong prefix " + method.getName()); + } + + TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType()); + boolean nullable = method.isAnnotationPresent(Nullable.class); + return new AutoValue_FieldValueTypeInformation.Builder() + .setName(name) + .setNullable(nullable) + .setType(type) + .setMethod(method) + .setElementType(getArrayComponentType(type)) + .setMapKeyType(getMapKeyType(type)) + .setMapValueType(getMapValueType(type)) + .build(); } - public static FieldValueTypeInformation of(TypeInformation typeInformation) { - return new AutoValue_FieldValueTypeInformation( - typeInformation.getName(), - typeInformation.getType().getRawType(), - getArrayComponentType(typeInformation), - getMapKeyType(typeInformation), - getMapValueType(typeInformation)); + public static FieldValueTypeInformation forSetter(Method method) { + String name; + if (method.getName().startsWith("set")) { + name = ReflectUtils.stripPrefix(method.getName(), "set"); + } else { + throw new RuntimeException("Setter has wrong prefix " + method.getName()); + } + if (method.getParameterCount() != 1) { + throw new RuntimeException("Setter methods should take a single argument."); + } + TypeDescriptor type = TypeDescriptor.of(method.getGenericParameterTypes()[0]); + boolean nullable = + Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance); + return new AutoValue_FieldValueTypeInformation.Builder() + .setName(name) + .setNullable(nullable) + .setType(type) + .setMethod(method) + .setElementType(getArrayComponentType(type)) + .setMapKeyType(getMapKeyType(type)) + .setMapValueType(getMapValueType(type)) + .build(); } - private static Type getArrayComponentType(TypeInformation typeInformation) { - return getArrayComponentType(typeInformation.getType()); + public FieldValueTypeInformation withName(String name) { + return toBuilder().setName(name).build(); } private static Type getArrayComponentType(Field field) { @@ -98,15 +173,19 @@ private static Type getArrayComponentType(TypeDescriptor valueType) { return null; } + public Class getRawType() { + return getType().getRawType(); + } + // If the Field is a map type, returns the key type, otherwise returns a null reference. @Nullable private static Type getMapKeyType(Field field) { - return getMapType(TypeDescriptor.of(field.getGenericType()), 0); + return getMapKeyType(TypeDescriptor.of(field.getGenericType())); } @Nullable - private static Type getMapKeyType(TypeInformation typeInformation) { - return getMapType(typeInformation.getType(), 0); + private static Type getMapKeyType(TypeDescriptor<?> typeDescriptor) { + return getMapType(typeDescriptor, 0); } // If the Field is a map type, returns the value type, otherwise returns a null reference. @@ -116,8 +195,8 @@ private static Type getMapValueType(Field field) { } @Nullable - private static Type getMapValueType(TypeInformation typeInformation) { - return getMapType(typeInformation.getType(), 1); + private static Type getMapValueType(TypeDescriptor typeDescriptor) { + return getMapType(typeDescriptor, 1); } // If the Field is a map type, returns the key or value type (0 is key type, 1 is value). diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java index 2ec1a42e7d07..139281f7f779 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java @@ -73,7 +73,7 @@ public T apply(Row row) { fromValue( type, row.getValue(i), - typeInformation.getType(), + typeInformation.getRawType(), typeInformation.getElementType(), typeInformation.getMapKeyType(), typeInformation.getMapValueType(), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java index f7757a8174e7..677823f9c0d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.schemas; import java.util.List; -import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -38,7 +37,6 @@ abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory(); /** Implementing class should override to return a constructor factory. */ - @Nullable abstract UserTypeCreatorFactory schemaTypeCreatorFactory(); @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 2c22400b050c..8eb8022841e3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -17,13 +17,16 @@ */ package org.apache.beam.sdk.schemas; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.schemas.utils.JavaBeanGetterFactory; -import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory; -import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory; +import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier; import org.apache.beam.sdk.schemas.utils.JavaBeanUtils; -import org.apache.beam.sdk.transforms.SerializableFunctions; +import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -41,15 +44,55 @@ */ @Experimental(Kind.SCHEMAS) public class JavaBeanSchema extends GetterBasedSchemaProvider { + /** {@link FieldValueTypeSupplier} that's based on getter methods. */ + @VisibleForTesting + public static class GetterTypeSupplier implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isGetter) + .map(FieldValueTypeInformation::forGetter) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } + } + + /** {@link FieldValueTypeSupplier} that's based on setter methods. */ + @VisibleForTesting + public static class SetterTypeSupplier implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isSetter) + .map(FieldValueTypeInformation::forSetter) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } + } + @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { - return JavaBeanUtils.schemaFromJavaBeanClass( - typeDescriptor.getRawType(), SerializableFunctions.identity()); + return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType()); } @Override public FieldValueGetterFactory fieldValueGetterFactory() { - return new JavaBeanGetterFactory(); + return (Class<?> targetClass, Schema schema) -> + JavaBeanUtils.getGetters(targetClass, schema, new GetterTypeSupplier()); } @Override @@ -59,6 +102,15 @@ UserTypeCreatorFactory schemaTypeCreatorFactory() { @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { - return new JavaBeanTypeInformationFactory(); + return (Class<?> targetClass, Schema schema) -> + JavaBeanUtils.getFieldTypes(targetClass, schema, new GetterTypeSupplier()); + } + + /** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */ + public static class JavaBeanSetterFactory implements FieldValueSetterFactory { + @Override + public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) { + return JavaBeanUtils.getSetters(targetClass, schema, new SetterTypeSupplier()); + } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index a7d6a3020ccd..1504717f78c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -17,11 +17,16 @@ */ package org.apache.beam.sdk.schemas; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier; import org.apache.beam.sdk.schemas.utils.POJOUtils; -import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory; -import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory; +import org.apache.beam.sdk.schemas.utils.ReflectUtils; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -39,6 +44,25 @@ */ @Experimental(Kind.SCHEMAS) public class JavaFieldSchema extends GetterBasedSchemaProvider { + /** {@link FieldValueTypeSupplier} that's based on public fields. */ + @VisibleForTesting + public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = + ReflectUtils.getFields(clazz) + .stream() + .map(FieldValueTypeInformation::forField) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } + } + @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType()); @@ -46,16 +70,19 @@ @Override public FieldValueGetterFactory fieldValueGetterFactory() { - return new PojoValueGetterFactory(); + return (Class<?> targetClass, Schema schema) -> + POJOUtils.getGetters(targetClass, schema, new JavaFieldTypeSupplier()); } @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { - return new PojoValueTypeInformationFactory(); + return (Class<?> targetClass, Schema schema) -> + POJOUtils.getFieldTypes(targetClass, schema, new JavaFieldTypeSupplier()); } @Override UserTypeCreatorFactory schemaTypeCreatorFactory() { - return new PojoTypeUserTypeCreatorFactory(); + return (Class<?> targetClass, Schema schema) -> + POJOUtils.getCreator(targetClass, schema, new JavaFieldTypeSupplier()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java deleted file mode 100644 index b22798861b12..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas; - -import org.apache.beam.sdk.schemas.utils.POJOUtils; - -/** Vends constructors for POJOs. */ -class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory { - @Override - public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { - return POJOUtils.getCreator(clazz, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java index c5fae19948e5..6b9b18f3740c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java @@ -27,7 +27,7 @@ private final Class<?> clazz; private final transient Constructor<?> constructor; - SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor) { + public SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor) { this.clazz = clazz; this.constructor = checkNotNull(constructor); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java deleted file mode 100644 index 6b76fa6fa66a..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.avro.specific.SpecificRecord; -import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A {@link FieldValueTypeInformation} for AVRO-generated specific records. */ -public class AvroSpecificRecordTypeInformationFactory implements FieldValueTypeInformationFactory { - @Override - public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return AvroUtils.getFieldTypes((Class<? extends SpecificRecord>) targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index df33d6430389..c2e737f69418 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -23,6 +23,7 @@ import com.google.common.base.CaseFormat; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.lang.reflect.Method; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -42,6 +43,8 @@ import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.reflect.AvroIgnore; +import org.apache.avro.reflect.AvroName; import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificRecord; @@ -56,6 +59,7 @@ import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Instant; import org.joda.time.ReadableInstant; @@ -117,6 +121,11 @@ private FixedBytesField(int size) { this.size = size; } + /** Create a {@link FixedBytesField} with the specified size. */ + public static FixedBytesField withSize(int size) { + return new FixedBytesField(size); + } + /** Create a {@link FixedBytesField} from a Beam {@link FieldType}. */ @Nullable public static FixedBytesField fromBeamFieldType(FieldType fieldType) { @@ -260,59 +269,100 @@ public static GenericRecord toGenericRecord( return g -> toGenericRecord(g, avroSchema); } - /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) { - try { - org.apache.avro.Schema avroSchema = - (org.apache.avro.Schema) (clazz.getDeclaredField("SCHEMA$").get(null)); - return toBeamSchema(avroSchema); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new IllegalArgumentException( - "Class " - + clazz - + " is not an AVRO SpecificRecord. " - + "No public SCHEMA$ field was found."); - } + /** Infer a {@link Schema} from either an AVRO-generated SpecificRecord or a POJO. */ + public static <T> Schema getSchema(Class<T> clazz) { + return toBeamSchema(ReflectData.get().getSchema(clazz)); } - private static final class AvroSpecificRecordFieldNamePolicy - implements SerializableFunction<String, String> { - Schema schema; - Map<String, String> nameMapping = Maps.newHashMap(); + private static final class AvroSpecificRecordFieldValueTypeSupplier + implements FieldValueTypeSupplier { + @Override + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, String> mapping = getMapping(schema); + Map<String, FieldValueTypeInformation> types = Maps.newHashMap(); + for (Method method : ReflectUtils.getMethods(clazz)) { + if (ReflectUtils.isGetter(method)) { + FieldValueTypeInformation fieldValueTypeInformation = + FieldValueTypeInformation.forGetter(method); + String name = mapping.get(fieldValueTypeInformation.getName()); + if (name != null) { + types.put(name, fieldValueTypeInformation.withName(name)); + } + } + } + + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); + } - AvroSpecificRecordFieldNamePolicy(Schema schema) { - this.schema = schema; + private Map<String, String> getMapping(Schema schema) { + Map<String, String> mapping = Maps.newHashMap(); for (Field field : schema.getFields()) { - String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); - nameMapping.put(getter, field.getName()); + String underscore = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); + mapping.put(underscore, field.getName()); // The Avro compiler might add a $ at the end of a getter to disambiguate. - nameMapping.put(getter + "$", field.getName()); + mapping.put(underscore + "$", field.getName()); + // If the field is in camel case already, then it's the identity mapping. + mapping.put(field.getName(), field.getName()); } + return mapping; } + } + private static final class AvroPojoFieldValueTypeSupplier implements FieldValueTypeSupplier { @Override - public String apply(String input) { - return nameMapping.getOrDefault(input, input); + public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) { + Map<String, FieldValueTypeInformation> types = Maps.newHashMap(); + for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) { + if (!f.isAnnotationPresent(AvroIgnore.class)) { + FieldValueTypeInformation typeInformation = FieldValueTypeInformation.forField(f); + AvroName avroname = f.getAnnotation(AvroName.class); + if (avroname != null) { + typeInformation = typeInformation.withName(avroname.value()); + } + types.put(typeInformation.getName(), typeInformation); + } + } + // Return the list ordered by the schema fields. + return schema + .getFields() + .stream() + .map(f -> types.get(f.getName())) + .collect(Collectors.toList()); } } - /** Get field types for an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> List<FieldValueTypeInformation> getFieldTypes( - Class<T> clazz, Schema schema) { - return JavaBeanUtils.getFieldTypes( - clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema)); + /** Get field types for an AVRO-generated SpecificRecord or a POJO. */ + public static <T> List<FieldValueTypeInformation> getFieldTypes(Class<T> clazz, Schema schema) { + if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return JavaBeanUtils.getFieldTypes( + clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier()); + } else { + return POJOUtils.getFieldTypes(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + } } - /** Get generated getters for an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> List<FieldValueGetter> getGetters( - Class<T> clazz, Schema schema) { - return JavaBeanUtils.getGetters(clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema)); + /** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */ + public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema schema) { + if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return JavaBeanUtils.getGetters( + clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier()); + } else { + return POJOUtils.getGetters(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + } } /** Get an object creator for an AVRO-generated SpecificRecord. */ - public static <T extends SpecificRecord> SchemaUserTypeCreator getCreator( - Class<T> clazz, Schema schema) { - return AvroByteBuddyUtils.getCreator(clazz, schema); + public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema schema) { + if (TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) { + return AvroByteBuddyUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema); + } else { + return POJOUtils.getCreator(clazz, schema, new AvroPojoFieldValueTypeSupplier()); + } } /** Converts AVRO schema to Beam field. */ @@ -480,7 +530,6 @@ public String apply(String input) { private static Object genericFromBeamField( Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object value) { TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema); - if (!fieldType.getNullable().equals(typeWithNullability.nullable)) { throw new IllegalArgumentException( "FieldType " diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java similarity index 68% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java index 84f9a5e050d6..12330ecb20b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java @@ -17,15 +17,19 @@ */ package org.apache.beam.sdk.schemas.utils; +import java.io.Serializable; import java.util.List; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; import org.apache.beam.sdk.schemas.Schema; -/** A {@link FieldValueTypeInformationFactory} for POJO objects objects. */ -public class PojoValueTypeInformationFactory implements FieldValueTypeInformationFactory { - @Override - public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return POJOUtils.getFieldTypes(targetClass, schema); - } +/** + * A naming policy for schema fields. This maps a name from the class (field name or getter name) to + * the matching field name in the schema. + */ +public interface FieldValueTypeSupplier extends Serializable { + /** + * Return all the FieldValueTypeInformations. The returned list must be in the same order as + * fields in the schema. + */ + List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java deleted file mode 100644 index 5c3d2ea17155..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueGetter; -import org.apache.beam.sdk.schemas.FieldValueGetterFactory; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.SerializableFunctions; - -/** A factory for creating {@link FieldValueGetter} objects for a JavaBean object. */ -public class JavaBeanGetterFactory implements FieldValueGetterFactory { - @Override - public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getGetters(targetClass, schema, SerializableFunctions.identity()); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java deleted file mode 100644 index e67a58dc6837..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueSetter; -import org.apache.beam.sdk.schemas.FieldValueSetterFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A factory for creating {@link FieldValueSetter} objects for a JavaBean object. */ -public class JavaBeanSetterFactory implements FieldValueSetterFactory { - @Override - public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getSetters(targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java deleted file mode 100644 index f445a87dc9ca..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueTypeInformation; -import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.transforms.SerializableFunctions; - -/** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */ -public class JavaBeanTypeInformationFactory implements FieldValueTypeInformationFactory { - @Override - public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getFieldTypes(targetClass, schema, SerializableFunctions.identity()); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java index 0ec07c9d8d77..58cc0ed0254a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.schemas.utils; import com.google.common.collect.Maps; -import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; @@ -48,55 +47,46 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter; import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; -import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.common.ReflectHelpers; /** A set of utilities to generate getter and setter classes for JavaBean objects. */ @Experimental(Kind.SCHEMAS) public class JavaBeanUtils { /** Create a {@link Schema} for a Java Bean class. */ - public static Schema schemaFromJavaBeanClass( - Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) { - return StaticSchemaInference.schemaFromClass( - clazz, c -> JavaBeanUtils.typeInformationFromClass(c, fieldNamePolicy)); + public static Schema schemaFromJavaBeanClass(Class<?> clazz) { + return StaticSchemaInference.schemaFromClass(clazz, JavaBeanUtils::typeInformationFromClass); } - private static List<TypeInformation> typeInformationFromClass( - Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) { - try { - List<TypeInformation> getterTypes = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isGetter) - .map(m -> TypeInformation.forGetter(m, fieldNamePolicy)) - .collect(Collectors.toList()); + private static List<FieldValueTypeInformation> typeInformationFromClass(Class<?> clazz) { + List<FieldValueTypeInformation> getterTypes = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isGetter) + .map(FieldValueTypeInformation::forGetter) + .collect(Collectors.toList()); - Map<String, TypeInformation> setterTypes = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isSetter) - .map(m -> TypeInformation.forSetter(m)) - .collect(Collectors.toMap(TypeInformation::getName, Function.identity())); - validateJavaBean(getterTypes, setterTypes); - return getterTypes; - } catch (IOException e) { - throw new RuntimeException(e); - } + Map<String, FieldValueTypeInformation> setterTypes = + ReflectUtils.getMethods(clazz) + .stream() + .filter(ReflectUtils::isSetter) + .map(FieldValueTypeInformation::forSetter) + .collect(Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); + validateJavaBean(getterTypes, setterTypes); + return getterTypes; } // Make sure that there are matching setters and getters. private static void validateJavaBean( - List<TypeInformation> getters, Map<String, TypeInformation> setters) { - for (TypeInformation type : getters) { - TypeInformation setterType = setters.get(type.getName()); + List<FieldValueTypeInformation> getters, Map<String, FieldValueTypeInformation> setters) { + for (FieldValueTypeInformation type : getters) { + FieldValueTypeInformation setterType = setters.get(type.getName()); if (setterType == null) { throw new RuntimeException( "JavaBean contained a getter for field " + type.getName() + "but did not contain a matching setter."); } - if (!type.equals(setterType)) { + if (!type.getType().equals(setterType.getType())) { throw new RuntimeException( "JavaBean contained setter for field " + type.getName() @@ -118,28 +108,9 @@ private static void validateJavaBean( Maps.newConcurrentMap(); public static List<FieldValueTypeInformation> getFieldTypes( - Class<?> clazz, Schema schema, SerializableFunction<String, String> fieldNamePolicy) { + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_FIELD_TYPES.computeIfAbsent( - new ClassWithSchema(clazz, schema), - c -> { - try { - Map<String, FieldValueTypeInformation> getterMap = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isGetter) - .map(m -> TypeInformation.forGetter(m, fieldNamePolicy)) - .map(FieldValueTypeInformation::of) - .collect( - Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> getterMap.get(f.getName())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema)); } // The list of getters for a class is cached, so we only create the classes the first time @@ -153,37 +124,22 @@ private static void validateJavaBean( * <p>The returned list is ordered by the order of fields in the schema. */ public static List<FieldValueGetter> getGetters( - Class<?> clazz, Schema schema, SerializableFunction<String, String> fieldNamePolicy) { + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_GETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - try { - Map<String, FieldValueGetter> getterMap = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isGetter) - .map(m -> JavaBeanUtils.createGetter(m, fieldNamePolicy)) - .collect(Collectors.toMap(FieldValueGetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> getterMap.get(f.getName())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return types.stream().map(JavaBeanUtils::createGetter).collect(Collectors.toList()); }); } - private static <T> FieldValueGetter createGetter( - Method getterMethod, SerializableFunction<String, String> fieldNamePolicy) { - TypeInformation typeInformation = TypeInformation.forGetter(getterMethod, fieldNamePolicy); + private static <T> FieldValueGetter createGetter(FieldValueTypeInformation typeInformation) { DynamicType.Builder<FieldValueGetter> builder = ByteBuddyUtils.subclassGetterInterface( BYTE_BUDDY, - getterMethod.getDeclaringClass(), + typeInformation.getMethod().getDeclaringClass(), new ConvertType(false).convert(typeInformation.getType())); - builder = implementGetterMethods(builder, getterMethod, fieldNamePolicy); + builder = implementGetterMethods(builder, typeInformation); try { return builder .make() @@ -195,20 +151,18 @@ private static void validateJavaBean( | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new RuntimeException("Unable to generate a getter for getter '" + getterMethod + "'"); + throw new RuntimeException( + "Unable to generate a getter for getter '" + typeInformation.getMethod() + "'"); } } private static DynamicType.Builder<FieldValueGetter> implementGetterMethods( - DynamicType.Builder<FieldValueGetter> builder, - Method method, - SerializableFunction<String, String> fieldNamePolicy) { - TypeInformation typeInformation = TypeInformation.forGetter(method, fieldNamePolicy); + DynamicType.Builder<FieldValueGetter> builder, FieldValueTypeInformation typeInformation) { return builder .method(ElementMatchers.named("name")) .intercept(FixedValue.reference(typeInformation.getName())) .method(ElementMatchers.named("get")) - .intercept(new InvokeGetterInstruction(method, fieldNamePolicy)); + .intercept(new InvokeGetterInstruction(typeInformation)); } // The list of setters for a class is cached, so we only create the classes the first time @@ -221,36 +175,23 @@ private static void validateJavaBean( * * <p>The returned list is ordered by the order of fields in the schema. */ - public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema schema) { + public static List<FieldValueSetter> getSetters( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_SETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - try { - Map<String, FieldValueSetter> setterMap = - ReflectUtils.getMethods(clazz) - .stream() - .filter(ReflectUtils::isSetter) - .map(JavaBeanUtils::createSetter) - .collect(Collectors.toMap(FieldValueSetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> setterMap.get(f.getName())) - .collect(Collectors.toList()); - } catch (IOException e) { - throw new RuntimeException(e); - } + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return types.stream().map(JavaBeanUtils::createSetter).collect(Collectors.toList()); }); } - private static <T> FieldValueSetter createSetter(Method setterMethod) { - TypeInformation typeInformation = TypeInformation.forSetter(setterMethod); + private static FieldValueSetter createSetter(FieldValueTypeInformation typeInformation) { DynamicType.Builder<FieldValueSetter> builder = ByteBuddyUtils.subclassSetterInterface( BYTE_BUDDY, - setterMethod.getDeclaringClass(), + typeInformation.getMethod().getDeclaringClass(), new ConvertType(false).convert(typeInformation.getType())); - builder = implementSetterMethods(builder, setterMethod); + builder = implementSetterMethods(builder, typeInformation.getMethod()); try { return builder .make() @@ -262,29 +203,27 @@ private static void validateJavaBean( | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { - throw new RuntimeException("Unable to generate a setter for setter '" + setterMethod + "'"); + throw new RuntimeException( + "Unable to generate a setter for setter '" + typeInformation.getMethod() + "'"); } } private static DynamicType.Builder<FieldValueSetter> implementSetterMethods( DynamicType.Builder<FieldValueSetter> builder, Method method) { - TypeInformation typeInformation = TypeInformation.forSetter(method); + FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method); return builder .method(ElementMatchers.named("name")) - .intercept(FixedValue.reference(typeInformation.getName())) + .intercept(FixedValue.reference(javaTypeInformation.getName())) .method(ElementMatchers.named("set")) .intercept(new InvokeSetterInstruction(method)); } // Implements a method to read a public getter out of an object. private static class InvokeGetterInstruction implements Implementation { - // Getter method that wil be invoked - private Method method; - private SerializableFunction<String, String> fieldNamePolicy; + private final FieldValueTypeInformation typeInformation; - InvokeGetterInstruction(Method method, SerializableFunction<String, String> fieldNamePolicy) { - this.method = method; - this.fieldNamePolicy = fieldNamePolicy; + InvokeGetterInstruction(FieldValueTypeInformation typeInformation) { + this.typeInformation = typeInformation; } @Override @@ -295,7 +234,6 @@ public InstrumentedType prepare(InstrumentedType instrumentedType) { @Override public ByteCodeAppender appender(final Target implementationTarget) { return (methodVisitor, implementationContext, instrumentedMethod) -> { - TypeInformation typeInformation = TypeInformation.forGetter(method, fieldNamePolicy); // this + method parameters. int numLocals = 1 + instrumentedMethod.getParameters().size(); @@ -305,7 +243,7 @@ public ByteCodeAppender appender(final Target implementationTarget) { // Method param is offset 1 (offset 0 is the this parameter). MethodVariableAccess.REFERENCE.loadFrom(1), // Invoke the getter - MethodInvocation.invoke(new ForLoadedMethod(method))); + MethodInvocation.invoke(new ForLoadedMethod(typeInformation.getMethod()))); StackManipulation stackManipulation = new StackManipulation.Compound( @@ -335,7 +273,7 @@ public InstrumentedType prepare(InstrumentedType instrumentedType) { @Override public ByteCodeAppender appender(final Target implementationTarget) { return (methodVisitor, implementationContext, instrumentedMethod) -> { - TypeInformation typeInformation = TypeInformation.forSetter(method); + FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method); // this + method parameters. int numLocals = 1 + instrumentedMethod.getParameters().size(); @@ -349,7 +287,7 @@ public ByteCodeAppender appender(final Target implementationTarget) { MethodVariableAccess.REFERENCE.loadFrom(1), // Do any conversions necessary. new ByteBuddyUtils.ConvertValueForSetter(readField) - .convert(typeInformation.getType()), + .convert(javaTypeInformation.getType()), // Now update the field and return void. MethodInvocation.invoke(new ForLoadedMethod(method)), MethodReturn.VOID); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java index 38f5307e1259..370b3ccc0548 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import net.bytebuddy.ByteBuddy; import net.bytebuddy.description.field.FieldDescription.ForLoadedField; import net.bytebuddy.description.type.TypeDescription.ForLoadedType; @@ -55,7 +56,6 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter; import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; -import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.sdk.values.TypeDescriptor; @@ -63,12 +63,11 @@ @Experimental(Kind.SCHEMAS) public class POJOUtils { public static Schema schemaFromPojoClass(Class<?> clazz) { - // We should cache the field order. - Function<Class, List<TypeInformation>> getTypesForClass = + Function<Class, List<FieldValueTypeInformation>> getTypesForClass = c -> ReflectUtils.getFields(c) .stream() - .map(TypeInformation::forField) + .map(FieldValueTypeInformation::forField) .collect(Collectors.toList()); return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass); } @@ -79,22 +78,10 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { private static final Map<ClassWithSchema, List<FieldValueTypeInformation>> CACHED_FIELD_TYPES = Maps.newConcurrentMap(); - public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz, Schema schema) { + public static List<FieldValueTypeInformation> getFieldTypes( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_FIELD_TYPES.computeIfAbsent( - new ClassWithSchema(clazz, schema), - c -> { - Map<String, FieldValueTypeInformation> typeInformationMap = - ReflectUtils.getFields(clazz) - .stream() - .map(FieldValueTypeInformation::of) - .collect( - Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> typeInformationMap.get(f.getName())) - .collect(Collectors.toList()); - }); + new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema)); } // The list of getters for a class is cached, so we only create the classes the first time @@ -102,21 +89,20 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { private static final Map<ClassWithSchema, List<FieldValueGetter>> CACHED_GETTERS = Maps.newConcurrentMap(); - public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) { + public static List<FieldValueGetter> getGetters( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { // Return the getters ordered by their position in the schema. return CACHED_GETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - Map<String, FieldValueGetter> getterMap = - ReflectUtils.getFields(clazz) - .stream() - .map(POJOUtils::createGetter) - .collect(Collectors.toMap(FieldValueGetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> getterMap.get(f.getName())) - .collect(Collectors.toList()); + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + List<FieldValueGetter> getters = + types.stream().map(POJOUtils::createGetter).collect(Collectors.toList()); + if (getters.size() != schema.getFieldCount()) { + throw new RuntimeException( + "Was not able to generate getters for schema: " + schema + " class: " + clazz); + } + return getters; }); } @@ -125,24 +111,21 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS = Maps.newConcurrentMap(); - public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema schema) { + public static <T> SchemaUserTypeCreator getCreator( + Class<T> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { return CACHED_CREATORS.computeIfAbsent( - new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema)); + new ClassWithSchema(clazz, schema), + c -> { + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return createCreator(clazz, schema, types); + }); } - private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) { + private static <T> SchemaUserTypeCreator createCreator( + Class<T> clazz, Schema schema, List<FieldValueTypeInformation> types) { // Get the list of class fields ordered by schema. - Map<String, Field> fieldMap = - ReflectUtils.getFields(clazz) - .stream() - .collect(Collectors.toMap(Field::getName, Function.identity())); List<Field> fields = - schema - .getFields() - .stream() - .map(f -> fieldMap.get(f.getName())) - .collect(Collectors.toList()); - + types.stream().map(FieldValueTypeInformation::getField).collect(Collectors.toList()); try { DynamicType.Builder<SchemaUserTypeCreator> builder = BYTE_BUDDY @@ -179,13 +162,16 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { * </code></pre> */ @SuppressWarnings("unchecked") - static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter(Field field) { + @Nullable + static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter( + FieldValueTypeInformation typeInformation) { + Field field = typeInformation.getField(); DynamicType.Builder<FieldValueGetter> builder = ByteBuddyUtils.subclassGetterInterface( BYTE_BUDDY, field.getDeclaringClass(), new ConvertType(false).convert(TypeDescriptor.of(field.getType()))); - builder = implementGetterMethods(builder, field); + builder = implementGetterMethods(builder, field, typeInformation.getName()); try { return builder .make() @@ -202,10 +188,10 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { } private static DynamicType.Builder<FieldValueGetter> implementGetterMethods( - DynamicType.Builder<FieldValueGetter> builder, Field field) { + DynamicType.Builder<FieldValueGetter> builder, Field field, String name) { return builder .method(ElementMatchers.named("name")) - .intercept(FixedValue.reference(field.getName())) + .intercept(FixedValue.reference(name)) .method(ElementMatchers.named("get")) .intercept(new ReadFieldInstruction(field)); } @@ -215,21 +201,14 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { private static final Map<ClassWithSchema, List<FieldValueSetter>> CACHED_SETTERS = Maps.newConcurrentMap(); - public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema schema) { + public static List<FieldValueSetter> getSetters( + Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) { // Return the setters, ordered by their position in the schema. return CACHED_SETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { - Map<String, FieldValueSetter> setterMap = - ReflectUtils.getFields(clazz) - .stream() - .map(POJOUtils::createSetter) - .collect(Collectors.toMap(FieldValueSetter::name, Function.identity())); - return schema - .getFields() - .stream() - .map(f -> setterMap.get(f.getName())) - .collect(Collectors.toList()); + List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema); + return types.stream().map(POJOUtils::createSetter).collect(Collectors.toList()); }); } @@ -250,7 +229,9 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { * </code></pre> */ @SuppressWarnings("unchecked") - private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> createSetter(Field field) { + private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> createSetter( + FieldValueTypeInformation typeInformation) { + Field field = typeInformation.getField(); DynamicType.Builder<FieldValueSetter> builder = ByteBuddyUtils.subclassSetterInterface( BYTE_BUDDY, @@ -284,7 +265,7 @@ public static Schema schemaFromPojoClass(Class<?> clazz) { // Implements a method to read a public field out of an object. static class ReadFieldInstruction implements Implementation { // Field that will be read. - private Field field; + private final Field field; ReadFieldInstruction(Field field) { this.field = field; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java deleted file mode 100644 index 275b791318f4..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueGetter; -import org.apache.beam.sdk.schemas.FieldValueGetterFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A factory for creating {@link FieldValueGetter} objects for a POJO. */ -public class PojoValueGetterFactory implements FieldValueGetterFactory { - @Override - public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return POJOUtils.getGetters(targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java deleted file mode 100644 index 5e9447a2e04a..000000000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.schemas.utils; - -import java.util.List; -import org.apache.beam.sdk.schemas.FieldValueSetter; -import org.apache.beam.sdk.schemas.FieldValueSetterFactory; -import org.apache.beam.sdk.schemas.Schema; - -/** A factory for creating {@link FieldValueSetter} objects for a POJO. */ -public class PojoValueSetterFactory implements FieldValueSetterFactory { - @Override - public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) { - return POJOUtils.getSetters(targetClass, schema); - } -} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java index 8619ea5e1780..c9d943b6ee6b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.lang.reflect.Modifier; @@ -40,7 +39,7 @@ private final Class clazz; private final Schema schema; - public ClassWithSchema(Class clazz, Schema schema) { + ClassWithSchema(Class clazz, Schema schema) { this.clazz = clazz; this.schema = schema; } @@ -67,7 +66,7 @@ public int hashCode() { private static final Map<Class, List<Field>> DECLARED_FIELDS = Maps.newHashMap(); /** Returns the list of public, non-static methods in the class, caching the results. */ - static List<Method> getMethods(Class clazz) throws IOException { + public static List<Method> getMethods(Class clazz) { return DECLARED_METHODS.computeIfAbsent( clazz, c -> { @@ -79,7 +78,7 @@ public int hashCode() { } // Get all public, non-static, non-transient fields. - static List<Field> getFields(Class<?> clazz) { + public static List<Field> getFields(Class<?> clazz) { return DECLARED_FIELDS.computeIfAbsent( clazz, c -> { @@ -104,7 +103,7 @@ public int hashCode() { }); } - static boolean isGetter(Method method) { + public static boolean isGetter(Method method) { if (Void.TYPE.equals(method.getReturnType())) { return false; } @@ -118,13 +117,13 @@ static boolean isGetter(Method method) { || Boolean.class.equals(method.getReturnType()))); } - static boolean isSetter(Method method) { + public static boolean isSetter(Method method) { return Void.TYPE.equals(method.getReturnType()) && method.getParameterCount() == 1 && method.getName().startsWith("set"); } - static String stripPrefix(String methodName, String prefix) { + public static String stripPrefix(String methodName, String prefix) { String firstLetter = methodName.substring(prefix.length(), prefix.length() + 1).toLowerCase(); return (methodName.length() == prefix.length() + 1) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index fdf978cfb46b..d3f81e8f7ec1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -20,21 +20,16 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.Function; -import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.ReadableInstant; @@ -64,94 +59,6 @@ .put(BigDecimal.class, FieldType.DECIMAL) .build(); - /** Relevant information about a Java type. */ - public static class TypeInformation { - private final String name; - private final TypeDescriptor type; - private final boolean nullable; - - /** Construct a {@link TypeInformation}. */ - private TypeInformation(String name, TypeDescriptor type, boolean nullable) { - this.name = name; - this.type = type; - this.nullable = nullable; - } - - /** Construct a {@link TypeInformation} from a class member variable. */ - public static TypeInformation forField(Field field) { - return new TypeInformation( - field.getName(), - TypeDescriptor.of(field.getGenericType()), - field.isAnnotationPresent(Nullable.class)); - } - - /** Construct a {@link TypeInformation} from a class getter. */ - public static TypeInformation forGetter( - Method method, SerializableFunction<String, String> fieldNamePolicy) { - String name; - if (method.getName().startsWith("get")) { - name = ReflectUtils.stripPrefix(method.getName(), "get"); - } else if (method.getName().startsWith("is")) { - name = ReflectUtils.stripPrefix(method.getName(), "is"); - } else { - throw new RuntimeException("Getter has wrong prefix " + method.getName()); - } - name = fieldNamePolicy.apply(name); - - TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType()); - boolean nullable = method.isAnnotationPresent(Nullable.class); - return new TypeInformation(name, type, nullable); - } - - /** Construct a {@link TypeInformation} from a class setter. */ - public static TypeInformation forSetter(Method method) { - String name; - if (method.getName().startsWith("set")) { - name = ReflectUtils.stripPrefix(method.getName(), "set"); - } else { - throw new RuntimeException("Setter has wrong prefix " + method.getName()); - } - if (method.getParameterCount() != 1) { - throw new RuntimeException("Setter methods should take a single argument."); - } - TypeDescriptor type = TypeDescriptor.of(method.getGenericParameterTypes()[0]); - boolean nullable = - Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance); - return new TypeInformation(name, type, nullable); - } - - public String getName() { - return name; - } - - public TypeDescriptor getType() { - return type; - } - - public boolean isNullable() { - return nullable; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TypeInformation that = (TypeInformation) o; - return nullable == that.nullable - && Objects.equals(name, that.name) - && Objects.equals(type, that.type); - } - - @Override - public int hashCode() { - return Objects.hash(name, type, nullable); - } - } - /** * Infer a schema from a Java class. * @@ -160,9 +67,9 @@ public int hashCode() { * public getter methods, or special annotations on the class. */ public static Schema schemaFromClass( - Class<?> clazz, Function<Class, List<TypeInformation>> getTypesForClass) { + Class<?> clazz, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) { Schema.Builder builder = Schema.builder(); - for (TypeInformation type : getTypesForClass.apply(clazz)) { + for (FieldValueTypeInformation type : getTypesForClass.apply(clazz)) { Schema.FieldType fieldType = fieldFromType(type.getType(), getTypesForClass); if (type.isNullable()) { builder.addNullableField(type.getName(), fieldType); @@ -175,7 +82,7 @@ public static Schema schemaFromClass( // Map a Java field type to a Beam Schema FieldType. private static Schema.FieldType fieldFromType( - TypeDescriptor type, Function<Class, List<TypeInformation>> getTypesForClass) { + TypeDescriptor type, Function<Class, List<FieldValueTypeInformation>> getTypesForClass) { FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType()); if (primitiveType != null) { return primitiveType; @@ -209,7 +116,8 @@ public static Schema schemaFromClass( FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), getTypesForClass); FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), getTypesForClass); checkArgument( - keyType.getTypeName().isPrimitiveType(), "Only primitive types can be map keys"); + keyType.getTypeName().isPrimitiveType(), + "Only primitive types can be map keys. type: " + keyType.getTypeName()); return FieldType.map(keyType, valueType); } else { throw new RuntimeException("Cannot infer schema from unparameterized map."); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java index f7cc64d7b486..3c22f4b62cf1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java @@ -22,12 +22,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.reflect.AvroIgnore; +import org.apache.avro.reflect.AvroName; +import org.apache.avro.reflect.AvroSchema; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.schemas.utils.AvroUtils.FixedBytesField; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; @@ -36,6 +44,142 @@ /** Tests for AVRO schema classes. */ public class AvroSchemaTest { + /** A test POJO that corresponds to our AVRO schema. */ + public static class AvroSubPojo { + @AvroName("bool_non_nullable") + public boolean boolNonNullable; + + @AvroName("int") + @org.apache.avro.reflect.Nullable + public Integer anInt; + + public AvroSubPojo(boolean boolNonNullable, Integer anInt) { + this.boolNonNullable = boolNonNullable; + this.anInt = anInt; + } + + public AvroSubPojo() {} + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AvroSubPojo)) { + return false; + } + AvroSubPojo that = (AvroSubPojo) o; + return boolNonNullable == that.boolNonNullable && Objects.equals(anInt, that.anInt); + } + + @Override + public int hashCode() { + return Objects.hash(boolNonNullable, anInt); + } + + @Override + public String toString() { + return "AvroSubPojo{" + "boolNonNullable=" + boolNonNullable + ", anInt=" + anInt + '}'; + } + } + + /** A test POJO that corresponds to our AVRO schema. */ + public static class AvroPojo { + public @AvroName("bool_non_nullable") boolean boolNonNullable; + + @org.apache.avro.reflect.Nullable + public @AvroName("int") Integer anInt; + + @org.apache.avro.reflect.Nullable + public @AvroName("long") Long aLong; + + @AvroName("float") + @org.apache.avro.reflect.Nullable + public Float aFloat; + + @AvroName("double") + @org.apache.avro.reflect.Nullable + public Double aDouble; + + @org.apache.avro.reflect.Nullable public String string; + @org.apache.avro.reflect.Nullable public ByteBuffer bytes; + + @AvroSchema("{\"type\": \"fixed\", \"size\": 4, \"name\": \"fixed4\"}") + @org.apache.avro.reflect.Nullable + public byte[] fixed; + + @org.apache.avro.reflect.Nullable public AvroSubPojo row; + @org.apache.avro.reflect.Nullable public List<AvroSubPojo> array; + @org.apache.avro.reflect.Nullable public Map<String, AvroSubPojo> map; + @AvroIgnore String extraField; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof AvroPojo)) { + return false; + } + AvroPojo avroPojo = (AvroPojo) o; + return boolNonNullable == avroPojo.boolNonNullable + && Objects.equals(anInt, avroPojo.anInt) + && Objects.equals(aLong, avroPojo.aLong) + && Objects.equals(aFloat, avroPojo.aFloat) + && Objects.equals(aDouble, avroPojo.aDouble) + && Objects.equals(string, avroPojo.string) + && Objects.equals(bytes, avroPojo.bytes) + && Arrays.equals(fixed, avroPojo.fixed) + && Objects.equals(row, avroPojo.row) + && Objects.equals(array, avroPojo.array) + && Objects.equals(map, avroPojo.map); + } + + @Override + public int hashCode() { + return Objects.hash( + boolNonNullable, + anInt, + aLong, + aFloat, + aDouble, + string, + bytes, + Arrays.hashCode(fixed), + row, + array, + map); + } + + public AvroPojo( + boolean boolNonNullable, + int anInt, + long aLong, + float aFloat, + double aDouble, + String string, + ByteBuffer bytes, + byte[] fixed, + AvroSubPojo row, + List<AvroSubPojo> array, + Map<String, AvroSubPojo> map) { + this.boolNonNullable = boolNonNullable; + this.anInt = anInt; + this.aLong = aLong; + this.aFloat = aFloat; + this.aDouble = aDouble; + this.string = string; + this.bytes = bytes; + this.fixed = fixed; + this.row = row; + this.array = array; + this.map = map; + this.extraField = ""; + } + + public AvroPojo() {} + } + private static final Schema SUBSCHEMA = Schema.builder() .addField("bool_non_nullable", FieldType.BOOLEAN) @@ -52,18 +196,27 @@ .addNullableField("double", FieldType.DOUBLE) .addNullableField("string", FieldType.STRING) .addNullableField("bytes", FieldType.BYTES) - .addField("fixed", FieldType.BYTES.withMetadata("FIXED:4")) + .addField("fixed", FixedBytesField.withSize(4).toBeamType()) .addNullableField("timestampMillis", FieldType.DATETIME) .addNullableField("row", SUB_TYPE) .addNullableField("array", FieldType.array(SUB_TYPE)) .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE)) .build(); - @Test - public void testSpecificRecordSchema() { - assertEquals( - SCHEMA, new AvroSpecificRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class))); - } + private static final Schema POJO_SCHEMA = + Schema.builder() + .addField("bool_non_nullable", FieldType.BOOLEAN) + .addNullableField("int", FieldType.INT32) + .addNullableField("long", FieldType.INT64) + .addNullableField("float", FieldType.FLOAT) + .addNullableField("double", FieldType.DOUBLE) + .addNullableField("string", FieldType.STRING) + .addNullableField("bytes", FieldType.BYTES) + .addField("fixed", FixedBytesField.withSize(4).toBeamType()) + .addNullableField("row", SUB_TYPE) + .addNullableField("array", FieldType.array(SUB_TYPE.withNullable(false))) + .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE.withNullable(false))) + .build(); private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4}; private static final DateTime DATE_TIME = @@ -130,18 +283,27 @@ public void testSpecificRecordSchema() { ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW)) .build(); + @Test + public void testSpecificRecordSchema() { + assertEquals(SCHEMA, new AvroRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class))); + } + + @Test + public void testPojoSchema() { + assertEquals(POJO_SCHEMA, AvroUtils.getSchema(AvroPojo.class)); + } + @Test public void testSpecificRecordToRow() { SerializableFunction<TestAvro, Row> toRow = - new AvroSpecificRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class)); - Row row = toRow.apply(AVRO_SPECIFIC_RECORD); + new AvroRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class)); assertEquals(ROW, toRow.apply(AVRO_SPECIFIC_RECORD)); } @Test public void testRowToSpecificRecord() { SerializableFunction<Row, TestAvro> fromRow = - new AvroSpecificRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class)); + new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class)); assertEquals(AVRO_SPECIFIC_RECORD, fromRow.apply(ROW)); } @@ -156,7 +318,51 @@ public void testGenericRecordToRow() { public void testRowToGenericRecord() { SerializableFunction<Row, GenericRecord> fromRow = AvroUtils.getRowToGenericRecordFunction(TestAvro.SCHEMA$); - GenericRecord generic = fromRow.apply(ROW); assertEquals(AVRO_GENERIC_RECORD, fromRow.apply(ROW)); } + + private static final AvroSubPojo SUB_POJO = new AvroSubPojo(true, 42); + private static final AvroPojo AVRO_POJO = + new AvroPojo( + true, + 43, + 44L, + (float) 44.1, + (double) 44.2, + "mystring", + ByteBuffer.wrap(BYTE_ARRAY), + BYTE_ARRAY, + SUB_POJO, + ImmutableList.of(SUB_POJO, SUB_POJO), + ImmutableMap.of("k1", SUB_POJO, "k2", SUB_POJO)); + + private static final Row ROW_FOR_POJO = + Row.withSchema(POJO_SCHEMA) + .addValues( + true, + 43, + 44L, + (float) 44.1, + (double) 44.2, + "mystring", + ByteBuffer.wrap(BYTE_ARRAY), + ByteBuffer.wrap(BYTE_ARRAY), + NESTED_ROW, + ImmutableList.of(NESTED_ROW, NESTED_ROW), + ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW)) + .build(); + + @Test + public void testPojoRecordToRow() { + SerializableFunction<AvroPojo, Row> toRow = + new AvroRecordSchema().toRowFunction(TypeDescriptor.of(AvroPojo.class)); + assertEquals(ROW_FOR_POJO, toRow.apply(AVRO_POJO)); + } + + @Test + public void testRowToPojo() { + SerializableFunction<Row, AvroPojo> fromRow = + new AvroRecordSchema().fromRowFunction(TypeDescriptor.of(AvroPojo.class)); + assertEquals(AVRO_POJO, fromRow.apply(ROW_FOR_POJO)); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java index 938089911c7b..c835ea655764 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java @@ -37,6 +37,8 @@ import java.util.List; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueSetter; +import org.apache.beam.sdk.schemas.JavaBeanSchema; +import org.apache.beam.sdk.schemas.JavaBeanSchema.SetterTypeSupplier; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray; @@ -49,7 +51,6 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean; -import org.apache.beam.sdk.transforms.SerializableFunctions; import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; @@ -61,8 +62,7 @@ @Test public void testNullable() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class); assertTrue(schema.getField("str").getType().getNullable()); assertFalse(schema.getField("anInt").getType().getNullable()); } @@ -70,62 +70,48 @@ public void testNullable() { @Test public void testMismatchingNullable() { thrown.expect(RuntimeException.class); - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - MismatchingNullableBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class); } @Test public void testSimpleBean() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class); SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); } @Test public void testNestedBean() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema); } @Test public void testPrimitiveArray() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - PrimitiveArrayBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedArray() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - NestedArrayBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedCollection() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - NestedCollectionBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema); } @Test public void testPrimitiveMap() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - PrimitiveMapBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema); } @Test public void testNestedMap() { - Schema schema = - JavaBeanUtils.schemaFromJavaBeanClass( - NestedMapBean.class, SerializableFunctions.identity()); + Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class); SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema); } @@ -147,7 +133,7 @@ public void testGeneratedSimpleGetters() { List<FieldValueGetter> getters = JavaBeanUtils.getGetters( - SimpleBean.class, SIMPLE_BEAN_SCHEMA, SerializableFunctions.identity()); + SimpleBean.class, SIMPLE_BEAN_SCHEMA, new JavaBeanSchema.GetterTypeSupplier()); assertEquals(12, getters.size()); assertEquals("str", getters.get(0).name()); @@ -174,7 +160,8 @@ public void testGeneratedSimpleGetters() { @Test public void testGeneratedSimpleSetters() { SimpleBean simpleBean = new SimpleBean(); - List<FieldValueSetter> setters = JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA); + List<FieldValueSetter> setters = + JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA, new SetterTypeSupplier()); assertEquals(12, setters.size()); setters.get(0).set(simpleBean, "field1"); @@ -219,7 +206,7 @@ public void testGeneratedSimpleBoxedGetters() { JavaBeanUtils.getGetters( BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA, - SerializableFunctions.identity()); + new JavaBeanSchema.GetterTypeSupplier()); assertEquals((byte) 41, getters.get(0).get(bean)); assertEquals((short) 42, getters.get(1).get(bean)); assertEquals((int) 43, getters.get(2).get(bean)); @@ -231,7 +218,8 @@ public void testGeneratedSimpleBoxedGetters() { public void testGeneratedSimpleBoxedSetters() { BeanWithBoxedFields bean = new BeanWithBoxedFields(); List<FieldValueSetter> setters = - JavaBeanUtils.getSetters(BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA); + JavaBeanUtils.getSetters( + BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA, new SetterTypeSupplier()); setters.get(0).set(bean, (byte) 41); setters.get(1).set(bean, (short) 42); @@ -250,7 +238,8 @@ public void testGeneratedSimpleBoxedSetters() { public void testGeneratedByteBufferSetters() { BeanWithByteArray bean = new BeanWithByteArray(); List<FieldValueSetter> setters = - JavaBeanUtils.getSetters(BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA); + JavaBeanUtils.getSetters( + BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA, new SetterTypeSupplier()); setters.get(0).set(bean, "field1".getBytes(Charset.defaultCharset())); setters.get(1).set(bean, "field2".getBytes(Charset.defaultCharset())); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java index 1a0946d6f609..e140a8bc1218 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java @@ -37,6 +37,7 @@ import java.util.List; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueSetter; +import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO; import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO; @@ -126,7 +127,8 @@ public void testGeneratedSimpleGetters() { new BigDecimal(42), new StringBuilder("stringBuilder")); - List<FieldValueGetter> getters = POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA); + List<FieldValueGetter> getters = + POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier()); assertEquals(12, getters.size()); assertEquals("str", getters.get(0).name()); assertEquals("field1", getters.get(0).get(simplePojo)); @@ -147,7 +149,8 @@ public void testGeneratedSimpleGetters() { @Test public void testGeneratedSimpleSetters() { SimplePOJO simplePojo = new SimplePOJO(); - List<FieldValueSetter> setters = POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA); + List<FieldValueSetter> setters = + POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new JavaFieldTypeSupplier()); assertEquals(12, setters.size()); setters.get(0).set(simplePojo, "field1"); @@ -182,7 +185,8 @@ public void testGeneratedSimpleBoxedGetters() { POJOWithBoxedFields pojo = new POJOWithBoxedFields((byte) 41, (short) 42, 43, 44L, true); List<FieldValueGetter> getters = - POJOUtils.getGetters(POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA); + POJOUtils.getGetters( + POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier()); assertEquals((byte) 41, getters.get(0).get(pojo)); assertEquals((short) 42, getters.get(1).get(pojo)); assertEquals((int) 43, getters.get(2).get(pojo)); @@ -194,7 +198,8 @@ public void testGeneratedSimpleBoxedGetters() { public void testGeneratedSimpleBoxedSetters() { POJOWithBoxedFields pojo = new POJOWithBoxedFields(); List<FieldValueSetter> setters = - POJOUtils.getSetters(POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA); + POJOUtils.getSetters( + POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new JavaFieldTypeSupplier()); setters.get(0).set(pojo, (byte) 41); setters.get(1).set(pojo, (short) 42); @@ -213,7 +218,8 @@ public void testGeneratedSimpleBoxedSetters() { public void testGeneratedByteBufferSetters() { POJOWithByteArray pojo = new POJOWithByteArray(); List<FieldValueSetter> setters = - POJOUtils.getSetters(POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA); + POJOUtils.getSetters( + POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, new JavaFieldTypeSupplier()); setters.get(0).set(pojo, BYTE_ARRAY); setters.get(1).set(pojo, BYTE_BUFFER.array()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 175470) Time Spent: 11h 20m (was: 11h 10m) > Provide automatic schema registration for AVROs > ----------------------------------------------- > > Key: BEAM-4454 > URL: https://issues.apache.org/jira/browse/BEAM-4454 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core > Reporter: Reuven Lax > Assignee: Reuven Lax > Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > Need to make sure this is a compatible change -- This message was sent by Atlassian JIRA (v7.6.3#76005)