[ https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=174674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174674 ]
ASF GitHub Bot logged work on BEAM-4454: ---------------------------------------- Author: ASF GitHub Bot Created on: 12/Dec/18 21:16 Start Date: 12/Dec/18 21:16 Worklog Time Spent: 10m Work Description: reuvenlax closed pull request #7233: [BEAM-4454] Add remaining functionality for AVRO schemas URL: https://github.com/apache/beam/pull/7233 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index fc11ecc390a2..29ef4a6745b3 100644 --- a/build.gradle +++ b/build.gradle @@ -121,6 +121,7 @@ rat { "**/.github/**/*", "**/package-list", + "**/test.avsc", "**/user.avsc", "**/test/resources/**/*.txt", "**/test/**/.placeholder", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java new file mode 100644 index 000000000000..fcb85f4dd664 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import java.util.List; +import org.apache.avro.specific.SpecificRecord; +import org.apache.beam.sdk.schemas.utils.AvroUtils; + +/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */ +public class AvroSpecificRecordGetterFactory implements FieldValueGetterFactory { + @Override + public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { + return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, schema); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java new file mode 100644 index 000000000000..d8e4bda342f8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * A {@link SchemaProvider} for AVRO generated SpecificRecords. + * + * <p>This provider infers a schema from generates SpecificRecord objects, and creates schemas and + * rows that bind to the appropriate fields. + */ +public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider { + @Override + public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { + return AvroUtils.getSchema((Class<? extends SpecificRecord>) typeDescriptor.getRawType()); + } + + @Override + public FieldValueGetterFactory fieldValueGetterFactory() { + return new AvroSpecificRecordGetterFactory(); + } + + @Override + public UserTypeCreatorFactory schemaTypeCreatorFactory() { + return new AvroSpecificRecordUserTypeCreatorFactory(); + } + + @Override + public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { + return new AvroSpecificRecordTypeInformationFactory(); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java new file mode 100644 index 000000000000..68d0d6a7d068 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import org.apache.avro.specific.SpecificRecord; +import org.apache.beam.sdk.schemas.utils.AvroUtils; + +/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */ +public class AvroSpecificRecordUserTypeCreatorFactory implements UserTypeCreatorFactory { + @Override + public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { + return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java index af5207275f9b..f7757a8174e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java @@ -17,8 +17,8 @@ */ package org.apache.beam.sdk.schemas; -import java.lang.reflect.InvocationTargetException; import java.util.List; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -34,45 +34,12 @@ /** Implementing class should override to return a getter factory. */ abstract FieldValueGetterFactory fieldValueGetterFactory(); - /** Implementing class should override to return a setter factory. */ - abstract FieldValueSetterFactory fieldValueSetterFactory(); - /** Implementing class should override to return a type-information factory. */ abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory(); - /** - * Implementing class should override to return a constructor factory. - * - * <p>Tne default factory uses the default constructor and the setters to construct an object. - */ - UserTypeCreatorFactory schemaTypeCreatorFactory() { - Factory<List<FieldValueSetter>> setterFactory = new CachingFactory<>(fieldValueSetterFactory()); - return new UserTypeCreatorFactory() { - @Override - public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { - List<FieldValueSetter> setters = setterFactory.create(clazz, schema); - return new SchemaUserTypeCreator() { - @Override - public Object create(Object... params) { - Object object; - try { - object = clazz.getDeclaredConstructor().newInstance(); - } catch (NoSuchMethodException - | IllegalAccessException - | InvocationTargetException - | InstantiationException e) { - throw new RuntimeException("Failed to instantiate object ", e); - } - for (int i = 0; i < params.length; ++i) { - FieldValueSetter setter = setters.get(i); - setter.set(object, params[i]); - } - return object; - } - }; - } - }; - } + /** Implementing class should override to return a constructor factory. */ + @Nullable + abstract UserTypeCreatorFactory schemaTypeCreatorFactory(); @Override public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 1e127b181210..2c22400b050c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory; import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory; import org.apache.beam.sdk.schemas.utils.JavaBeanUtils; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -42,7 +43,8 @@ public class JavaBeanSchema extends GetterBasedSchemaProvider { @Override public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) { - return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType()); + return JavaBeanUtils.schemaFromJavaBeanClass( + typeDescriptor.getRawType(), SerializableFunctions.identity()); } @Override @@ -51,8 +53,8 @@ public FieldValueGetterFactory fieldValueGetterFactory() { } @Override - public FieldValueSetterFactory fieldValueSetterFactory() { - return new JavaBeanSetterFactory(); + UserTypeCreatorFactory schemaTypeCreatorFactory() { + return new SetterBasedCreatorFactory(new JavaBeanSetterFactory()); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index c6080e6863e5..a7d6a3020ccd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -21,7 +21,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.schemas.utils.POJOUtils; import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory; -import org.apache.beam.sdk.schemas.utils.PojoValueSetterFactory; import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory; import org.apache.beam.sdk.values.TypeDescriptor; @@ -50,11 +49,6 @@ public FieldValueGetterFactory fieldValueGetterFactory() { return new PojoValueGetterFactory(); } - @Override - public FieldValueSetterFactory fieldValueSetterFactory() { - return new PojoValueSetterFactory(); - } - @Override public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() { return new PojoValueTypeInformationFactory(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index d256b326e0f7..2eeecfd1fdaa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -540,6 +540,14 @@ public FieldType withMetadata(String metadata) { return toBuilder().setMetadata(metadata.getBytes(StandardCharsets.UTF_8)).build(); } + public String getMetadataString() { + if (getMetadata() != null) { + return new String(getMetadata(), StandardCharsets.UTF_8); + } else { + return ""; + } + } + public FieldType withNullable(boolean nullable) { return toBuilder().setNullable(nullable).build(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java new file mode 100644 index 000000000000..26884b78f634 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +/** + * A {@link UserTypeCreatorFactory} that uses a default constructor and a list of setters to + * construct a class. + */ +class SetterBasedCreatorFactory implements UserTypeCreatorFactory { + private final Factory<List<FieldValueSetter>> setterFactory; + + public SetterBasedCreatorFactory(Factory<List<FieldValueSetter>> setterFactory) { + this.setterFactory = new CachingFactory<>(setterFactory); + } + + @Override + public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) { + List<FieldValueSetter> setters = setterFactory.create(clazz, schema); + return new SchemaUserTypeCreator() { + @Override + public Object create(Object... params) { + Object object; + try { + object = clazz.getDeclaredConstructor().newInstance(); + } catch (NoSuchMethodException + | IllegalAccessException + | InvocationTargetException + | InstantiationException e) { + throw new RuntimeException("Failed to instantiate object ", e); + } + for (int i = 0; i < params.length; ++i) { + FieldValueSetter setter = setters.get(i); + setter.set(object, params[i]); + } + return object; + } + }; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java new file mode 100644 index 000000000000..a1adb25ae6e2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import com.google.common.collect.Maps; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription.ForLoadedType; +import net.bytebuddy.dynamic.DynamicType; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodCall; +import net.bytebuddy.implementation.bytecode.StackManipulation; +import net.bytebuddy.implementation.bytecode.assign.TypeCasting; +import net.bytebuddy.implementation.bytecode.collection.ArrayAccess; +import net.bytebuddy.implementation.bytecode.constant.IntegerConstant; +import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; +import net.bytebuddy.matcher.ElementMatchers; +import org.apache.avro.specific.SpecificRecord; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; +import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType; +import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; +import org.apache.beam.sdk.util.common.ReflectHelpers; +import org.apache.beam.sdk.values.TypeDescriptor; + +class AvroByteBuddyUtils { + private static final ByteBuddy BYTE_BUDDY = new ByteBuddy(); + + // Cache the generated constructors. + private static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS = + Maps.newConcurrentMap(); + + static <T extends SpecificRecord> SchemaUserTypeCreator getCreator( + Class<T> clazz, Schema schema) { + return CACHED_CREATORS.computeIfAbsent( + new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema)); + } + + private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) { + Constructor baseConstructor = null; + Constructor[] constructors = clazz.getDeclaredConstructors(); + for (Constructor constructor : constructors) { + // TODO: This assumes that Avro only generates one constructor with this many fields. + if (constructor.getParameterCount() == schema.getFieldCount()) { + baseConstructor = constructor; + } + } + if (baseConstructor == null) { + throw new RuntimeException("No matching constructor found for class " + clazz); + } + + // Generate a method call to create and invoke the SpecificRecord's constructor. . + MethodCall construct = MethodCall.construct(baseConstructor); + for (int i = 0; i < baseConstructor.getParameterTypes().length; ++i) { + Class<?> baseType = baseConstructor.getParameterTypes()[i]; + construct = construct.with(readAndConvertParameter(baseType, i), baseType); + } + + try { + DynamicType.Builder<SchemaUserTypeCreator> builder = + BYTE_BUDDY + .subclass(SchemaUserTypeCreator.class) + .method(ElementMatchers.named("create")) + .intercept(construct); + + return builder + .make() + .load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION) + .getLoaded() + .getDeclaredConstructor() + .newInstance(); + } catch (InstantiationException + | IllegalAccessException + | NoSuchMethodException + | InvocationTargetException e) { + throw new RuntimeException( + "Unable to generate a getter for class " + clazz + " with schema " + schema); + } + } + + private static StackManipulation readAndConvertParameter( + Class<?> constructorParameterType, int index) { + // The types in the AVRO-generated constructor might be the types returned by Beam's Row class, + // so we have to convert the types used by Beam's Row class. + // We know that AVRO generates constructor parameters in the same order as fields + // in the schema, so we can just add the parameters sequentially. + ConvertType convertType = new ConvertType(true); + + // Map the AVRO-generated type to the one Beam will use. + ForLoadedType convertedType = + new ForLoadedType((Class) convertType.convert(TypeDescriptor.of(constructorParameterType))); + + // This will run inside the generated creator. Read the parameter and convert it to the + // type required by the SpecificRecord constructor. + StackManipulation readParameter = + new StackManipulation.Compound( + MethodVariableAccess.REFERENCE.loadFrom(1), + IntegerConstant.forValue(index), + ArrayAccess.REFERENCE.load(), + TypeCasting.to(convertedType)); + + // Convert to the parameter accepted by the SpecificRecord constructor. + return new ByteBuddyUtils.ConvertValueForSetter(readParameter) + .convert(TypeDescriptor.of(constructorParameterType)); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java new file mode 100644 index 000000000000..6b76fa6fa66a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import java.util.List; +import org.apache.avro.specific.SpecificRecord; +import org.apache.beam.sdk.schemas.FieldValueTypeInformation; +import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; +import org.apache.beam.sdk.schemas.Schema; + +/** A {@link FieldValueTypeInformation} for AVRO-generated specific records. */ +public class AvroSpecificRecordTypeInformationFactory implements FieldValueTypeInformationFactory { + @Override + public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { + return AvroUtils.getFieldTypes((Class<? extends SpecificRecord>) targetClass, schema); + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 8b9f182d0848..df33d6430389 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -20,8 +20,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import com.google.common.collect.ImmutableMap; +import com.google.common.base.CaseFormat; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -35,17 +36,25 @@ import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema.Type; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; import org.apache.avro.util.Utf8; import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.schemas.FieldValueGetter; +import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; +import org.apache.beam.sdk.schemas.SchemaUserTypeCreator; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.Row; import org.joda.time.Instant; import org.joda.time.ReadableInstant; @@ -53,6 +62,13 @@ /** Utils to convert AVRO records to Beam rows. */ @Experimental(Experimental.Kind.SCHEMAS) public class AvroUtils { + static { + // This works around a bug in the Avro library (AVRO-1891) around SpecificRecord's handling + // of DateTime types. + SpecificData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion()); + GenericData.get().addLogicalTypeConversion(new TimeConversions.TimestampConversion()); + } + // Unwrap an AVRO schema into the base type an whether it is nullable. static class TypeWithNullability { public final org.apache.avro.Schema type; @@ -91,6 +107,53 @@ } } + /** Wrapper for fixed byte fields. */ + public static class FixedBytesField { + private static final String PREFIX = "FIXED:"; + + private final int size; + + private FixedBytesField(int size) { + this.size = size; + } + + /** Create a {@link FixedBytesField} from a Beam {@link FieldType}. */ + @Nullable + public static FixedBytesField fromBeamFieldType(FieldType fieldType) { + String metadata = fieldType.getMetadataString(); + if (fieldType.getTypeName().equals(TypeName.BYTES) && metadata.startsWith(PREFIX)) { + return new FixedBytesField(Integer.parseInt(metadata.substring(6))); + } else { + return null; + } + } + + /** Create a {@link FixedBytesField} from an AVRO type. */ + @Nullable + public static FixedBytesField fromAvroType(org.apache.avro.Schema type) { + if (type.getType().equals(Type.FIXED)) { + return new FixedBytesField(type.getFixedSize()); + } else { + return null; + } + } + + /** Get the size. */ + public int getSize() { + return size; + } + + /** Convert to a Beam type. */ + public FieldType toBeamType() { + return Schema.FieldType.BYTES.withMetadata(PREFIX + Integer.toString(size)); + } + + /** Convert to an AVRO type. */ + public org.apache.avro.Schema toAvroType() { + return org.apache.avro.Schema.createFixed(null, "", "", size); + } + } + private AvroUtils() {} /** @@ -142,12 +205,7 @@ public static Row toBeamRowStrict(GenericRecord record, @Nullable Schema schema) for (Schema.Field field : schema.getFields()) { Object value = record.get(field.getName()); org.apache.avro.Schema fieldAvroSchema = avroSchema.getField(field.getName()).schema(); - - if (value == null) { - builder.addValue(null); - } else { - builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, field.getType())); - } + builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, field.getType())); } return builder.build(); @@ -184,6 +242,79 @@ public static GenericRecord toGenericRecord( return builder.build(); } + /** + * Returns a function mapping AVRO {@link GenericRecord}s to Beam {@link Row}s for use in {@link + * org.apache.beam.sdk.values.PCollection#setSchema}. + */ + public static SerializableFunction<GenericRecord, Row> getGenericRecordToRowFunction( + @Nullable Schema schema) { + return g -> toBeamRowStrict(g, schema); + } + + /** + * Returns a function mapping Beam {@link Row}s to AVRO {@link GenericRecord}s for use in {@link + * org.apache.beam.sdk.values.PCollection#setSchema}. + */ + public static SerializableFunction<Row, GenericRecord> getRowToGenericRecordFunction( + @Nullable org.apache.avro.Schema avroSchema) { + return g -> toGenericRecord(g, avroSchema); + } + + /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */ + public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) { + try { + org.apache.avro.Schema avroSchema = + (org.apache.avro.Schema) (clazz.getDeclaredField("SCHEMA$").get(null)); + return toBeamSchema(avroSchema); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException( + "Class " + + clazz + + " is not an AVRO SpecificRecord. " + + "No public SCHEMA$ field was found."); + } + } + + private static final class AvroSpecificRecordFieldNamePolicy + implements SerializableFunction<String, String> { + Schema schema; + Map<String, String> nameMapping = Maps.newHashMap(); + + AvroSpecificRecordFieldNamePolicy(Schema schema) { + this.schema = schema; + for (Field field : schema.getFields()) { + String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName()); + nameMapping.put(getter, field.getName()); + // The Avro compiler might add a $ at the end of a getter to disambiguate. + nameMapping.put(getter + "$", field.getName()); + } + } + + @Override + public String apply(String input) { + return nameMapping.getOrDefault(input, input); + } + } + + /** Get field types for an AVRO-generated SpecificRecord. */ + public static <T extends SpecificRecord> List<FieldValueTypeInformation> getFieldTypes( + Class<T> clazz, Schema schema) { + return JavaBeanUtils.getFieldTypes( + clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema)); + } + + /** Get generated getters for an AVRO-generated SpecificRecord. */ + public static <T extends SpecificRecord> List<FieldValueGetter> getGetters( + Class<T> clazz, Schema schema) { + return JavaBeanUtils.getGetters(clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema)); + } + + /** Get an object creator for an AVRO-generated SpecificRecord. */ + public static <T extends SpecificRecord> SchemaUserTypeCreator getCreator( + Class<T> clazz, Schema schema) { + return AvroByteBuddyUtils.getCreator(clazz, schema); + } + /** Converts AVRO schema to Beam field. */ private static Schema.FieldType toFieldType(TypeWithNullability type) { Schema.FieldType fieldType = null; @@ -224,7 +355,7 @@ public static GenericRecord toGenericRecord( break; case FIXED: - fieldType = Schema.FieldType.BYTES; + fieldType = FixedBytesField.fromAvroType(type.type).toBeamType(); break; case STRING: @@ -312,7 +443,12 @@ public static GenericRecord toGenericRecord( break; case BYTES: - baseType = org.apache.avro.Schema.create(Type.BYTES); + FixedBytesField fixedBytesField = FixedBytesField.fromBeamFieldType(fieldType); + if (fixedBytesField != null) { + baseType = fixedBytesField.toAvroType(); + } else { + baseType = org.apache.avro.Schema.create(Type.BYTES); + } break; case ARRAY: @@ -340,9 +476,24 @@ public static GenericRecord toGenericRecord( return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : baseType; } + @Nullable private static Object genericFromBeamField( - Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, Object value) { - org.apache.avro.Schema expectedSchema = getFieldSchema(fieldType); + Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable Object value) { + TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema); + + if (!fieldType.getNullable().equals(typeWithNullability.nullable)) { + throw new IllegalArgumentException( + "FieldType " + + fieldType + + " and AVRO schema " + + avroSchema + + " don't have matching nullability"); + } + + if (value == null) { + return value; + } + switch (fieldType.getTypeName()) { case BYTE: case INT16: @@ -351,81 +502,67 @@ private static Object genericFromBeamField( case FLOAT: case DOUBLE: case BOOLEAN: - return checkValueType(avroSchema, value, fieldType, expectedSchema); + return value; case STRING: return new Utf8((String) value); case DECIMAL: BigDecimal decimal = (BigDecimal) value; - LogicalType logicalType = avroSchema.getLogicalType(); - ByteBuffer byteBuffer = - new Conversions.DecimalConversion().toBytes(decimal, null, logicalType); - return checkValueType(avroSchema, byteBuffer, fieldType, expectedSchema); + LogicalType logicalType = typeWithNullability.type.getLogicalType(); + return new Conversions.DecimalConversion().toBytes(decimal, null, logicalType); case DATETIME: ReadableInstant instant = (ReadableInstant) value; - return checkValueType(avroSchema, instant.getMillis(), fieldType, expectedSchema); + return instant.getMillis(); case BYTES: - return checkValueType( - avroSchema, ByteBuffer.wrap((byte[]) value), fieldType, expectedSchema); + FixedBytesField fixedBytesField = FixedBytesField.fromBeamFieldType(fieldType); + if (fixedBytesField != null) { + byte[] byteArray = (byte[]) value; + if (byteArray.length != fixedBytesField.getSize()) { + throw new IllegalArgumentException("Incorrectly sized byte array."); + } + return GenericData.get().createFixed(null, (byte[]) value, typeWithNullability.type); + } else { + return ByteBuffer.wrap((byte[]) value); + } case ARRAY: - List array = (List) checkValueType(avroSchema, value, fieldType, expectedSchema); + List array = (List) value; List<Object> translatedArray = Lists.newArrayListWithExpectedSize(array.size()); - org.apache.avro.Schema avroArrayType = new TypeWithNullability(avroSchema).type; for (Object arrayElement : array) { translatedArray.add( genericFromBeamField( fieldType.getCollectionElementType(), - avroArrayType.getElementType(), + typeWithNullability.type.getElementType(), arrayElement)); } - return checkValueType(avroSchema, translatedArray, fieldType, expectedSchema); + return translatedArray; case MAP: - ImmutableMap.Builder builder = ImmutableMap.builder(); - Map<Object, Object> valueMap = - (Map<Object, Object>) checkValueType(avroSchema, value, fieldType, expectedSchema); - org.apache.avro.Schema avroMapType = new TypeWithNullability(avroSchema).type; - + Map map = Maps.newHashMap(); + Map<Object, Object> valueMap = (Map<Object, Object>) value; for (Map.Entry entry : valueMap.entrySet()) { Utf8 key = new Utf8((String) entry.getKey()); - builder.put( + map.put( key, genericFromBeamField( - fieldType.getMapValueType(), avroMapType.getValueType(), entry.getValue())); + fieldType.getMapValueType(), + typeWithNullability.type.getValueType(), + entry.getValue())); } - return checkValueType(avroSchema, builder.build(), fieldType, expectedSchema); + return map; case ROW: - return checkValueType( - avroSchema, toGenericRecord((Row) value, avroSchema), fieldType, expectedSchema); + return toGenericRecord((Row) value, typeWithNullability.type); default: throw new IllegalArgumentException("Unsupported type " + fieldType); } } - private static Object checkValueType( - org.apache.avro.Schema avroSchema, - Object o, - FieldType fieldType, - org.apache.avro.Schema expectedType) { - TypeWithNullability typeWithNullability = new TypeWithNullability(avroSchema); - if (!fieldType.getNullable().equals(typeWithNullability.nullable)) { - throw new IllegalArgumentException( - "FieldType " - + fieldType - + " and AVRO schema " - + avroSchema - + " don't have matching nullability"); - } - return o; - } - /** * Strict conversion from AVRO to Beam, strict because it doesn't do widening or narrowing during * conversion. @@ -436,10 +573,14 @@ private static Object checkValueType( * @return value converted for {@link Row} */ @SuppressWarnings("unchecked") + @Nullable public static Object convertAvroFieldStrict( - @Nonnull Object value, + @Nullable Object value, @Nonnull org.apache.avro.Schema avroSchema, @Nonnull Schema.FieldType fieldType) { + if (value == null) { + return null; + } TypeWithNullability type = new TypeWithNullability(avroSchema); LogicalType logicalType = LogicalTypes.fromSchema(type.type); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java index accbb20dbf7b..df910bac97d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java @@ -38,6 +38,7 @@ import net.bytebuddy.implementation.bytecode.collection.ArrayFactory; import net.bytebuddy.implementation.bytecode.member.MethodInvocation; import net.bytebuddy.matcher.ElementMatchers; +import org.apache.avro.generic.GenericFixed; import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueSetter; import org.apache.beam.sdk.values.TypeDescriptor; @@ -96,6 +97,9 @@ public T convert(TypeDescriptor typeDescriptor) { return convertDateTime(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) { return convertByteBuffer(typeDescriptor); + } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) { + // TODO: Refactor AVRO-specific check into separate class. + return convertGenericFixed(typeDescriptor); } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) { return convertCharSequence(typeDescriptor); } else if (typeDescriptor.getRawType().isPrimitive()) { @@ -115,6 +119,8 @@ public T convert(TypeDescriptor typeDescriptor) { protected abstract T convertByteBuffer(TypeDescriptor<?> type); + protected abstract T convertGenericFixed(TypeDescriptor<?> type); + protected abstract T convertCharSequence(TypeDescriptor<?> type); protected abstract T convertPrimitive(TypeDescriptor<?> type); @@ -173,6 +179,11 @@ protected Type convertByteBuffer(TypeDescriptor<?> type) { return byte[].class; } + @Override + protected Type convertGenericFixed(TypeDescriptor<?> type) { + return byte[].class; + } + @Override protected Type convertCharSequence(TypeDescriptor<?> type) { return String.class; @@ -304,6 +315,23 @@ protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) { .getOnly())); } + @Override + protected StackManipulation convertGenericFixed(TypeDescriptor<?> type) { + // TODO: Refactor AVRO-specific code into separate class. + + // Generate the following code: + // return value.bytes(); + + return new Compound( + readValue, + MethodInvocation.invoke( + new ForLoadedType(GenericFixed.class) + .getDeclaredMethods() + .filter( + ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTE_ARRAY_TYPE))) + .getOnly())); + } + @Override protected StackManipulation convertCharSequence(TypeDescriptor<?> type) { // If the member is a String, then return it. @@ -464,6 +492,28 @@ protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) { .getOnly())); } + @Override + protected StackManipulation convertGenericFixed(TypeDescriptor<?> type) { + // Generate the following code: + // return T((byte[]) value); + + // TODO: Refactor AVRO-specific code out of this class. + ForLoadedType loadedType = new ForLoadedType(type.getRawType()); + return new Compound( + TypeCreation.of(loadedType), + Duplication.SINGLE, + readValue, + TypeCasting.to(BYTE_ARRAY_TYPE), + // Create a new instance that wraps this byte[]. + MethodInvocation.invoke( + loadedType + .getDeclaredMethods() + .filter( + ElementMatchers.isConstructor() + .and(ElementMatchers.takesArguments(BYTE_ARRAY_TYPE))) + .getOnly())); + } + @Override protected StackManipulation convertCharSequence(TypeDescriptor<?> type) { // If the type is a String, just return it. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java index 0bb9e995cd71..5c3d2ea17155 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java @@ -21,11 +21,12 @@ import org.apache.beam.sdk.schemas.FieldValueGetter; import org.apache.beam.sdk.schemas.FieldValueGetterFactory; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableFunctions; /** A factory for creating {@link FieldValueGetter} objects for a JavaBean object. */ public class JavaBeanGetterFactory implements FieldValueGetterFactory { @Override public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getGetters(targetClass, schema); + return JavaBeanUtils.getGetters(targetClass, schema, SerializableFunctions.identity()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java index 1b3826227c8a..f445a87dc9ca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java @@ -21,11 +21,12 @@ import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.SerializableFunctions; /** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */ public class JavaBeanTypeInformationFactory implements FieldValueTypeInformationFactory { @Override public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema schema) { - return JavaBeanUtils.getFieldTypes(targetClass, schema); + return JavaBeanUtils.getFieldTypes(targetClass, schema, SerializableFunctions.identity()); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java index 8a213b436eba..0ec07c9d8d77 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java @@ -49,23 +49,27 @@ import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter; import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema; import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.common.ReflectHelpers; /** A set of utilities to generate getter and setter classes for JavaBean objects. */ @Experimental(Kind.SCHEMAS) public class JavaBeanUtils { /** Create a {@link Schema} for a Java Bean class. */ - public static Schema schemaFromJavaBeanClass(Class<?> clazz) { - return StaticSchemaInference.schemaFromClass(clazz, JavaBeanUtils::typeInformationFromClass); + public static Schema schemaFromJavaBeanClass( + Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) { + return StaticSchemaInference.schemaFromClass( + clazz, c -> JavaBeanUtils.typeInformationFromClass(c, fieldNamePolicy)); } - private static List<TypeInformation> typeInformationFromClass(Class<?> clazz) { + private static List<TypeInformation> typeInformationFromClass( + Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) { try { List<TypeInformation> getterTypes = ReflectUtils.getMethods(clazz) .stream() .filter(ReflectUtils::isGetter) - .map(m -> TypeInformation.forGetter(m)) + .map(m -> TypeInformation.forGetter(m, fieldNamePolicy)) .collect(Collectors.toList()); Map<String, TypeInformation> setterTypes = @@ -113,7 +117,8 @@ private static void validateJavaBean( private static final Map<ClassWithSchema, List<FieldValueTypeInformation>> CACHED_FIELD_TYPES = Maps.newConcurrentMap(); - public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz, Schema schema) { + public static List<FieldValueTypeInformation> getFieldTypes( + Class<?> clazz, Schema schema, SerializableFunction<String, String> fieldNamePolicy) { return CACHED_FIELD_TYPES.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { @@ -122,7 +127,7 @@ private static void validateJavaBean( ReflectUtils.getMethods(clazz) .stream() .filter(ReflectUtils::isGetter) - .map(TypeInformation::forGetter) + .map(m -> TypeInformation.forGetter(m, fieldNamePolicy)) .map(FieldValueTypeInformation::of) .collect( Collectors.toMap(FieldValueTypeInformation::getName, Function.identity())); @@ -147,7 +152,8 @@ private static void validateJavaBean( * * <p>The returned list is ordered by the order of fields in the schema. */ - public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) { + public static List<FieldValueGetter> getGetters( + Class<?> clazz, Schema schema, SerializableFunction<String, String> fieldNamePolicy) { return CACHED_GETTERS.computeIfAbsent( new ClassWithSchema(clazz, schema), c -> { @@ -156,7 +162,7 @@ private static void validateJavaBean( ReflectUtils.getMethods(clazz) .stream() .filter(ReflectUtils::isGetter) - .map(JavaBeanUtils::createGetter) + .map(m -> JavaBeanUtils.createGetter(m, fieldNamePolicy)) .collect(Collectors.toMap(FieldValueGetter::name, Function.identity())); return schema .getFields() @@ -169,14 +175,15 @@ private static void validateJavaBean( }); } - private static <T> FieldValueGetter createGetter(Method getterMethod) { - TypeInformation typeInformation = TypeInformation.forGetter(getterMethod); + private static <T> FieldValueGetter createGetter( + Method getterMethod, SerializableFunction<String, String> fieldNamePolicy) { + TypeInformation typeInformation = TypeInformation.forGetter(getterMethod, fieldNamePolicy); DynamicType.Builder<FieldValueGetter> builder = ByteBuddyUtils.subclassGetterInterface( BYTE_BUDDY, getterMethod.getDeclaringClass(), new ConvertType(false).convert(typeInformation.getType())); - builder = implementGetterMethods(builder, getterMethod); + builder = implementGetterMethods(builder, getterMethod, fieldNamePolicy); try { return builder .make() @@ -193,13 +200,15 @@ private static void validateJavaBean( } private static DynamicType.Builder<FieldValueGetter> implementGetterMethods( - DynamicType.Builder<FieldValueGetter> builder, Method method) { - TypeInformation typeInformation = TypeInformation.forGetter(method); + DynamicType.Builder<FieldValueGetter> builder, + Method method, + SerializableFunction<String, String> fieldNamePolicy) { + TypeInformation typeInformation = TypeInformation.forGetter(method, fieldNamePolicy); return builder .method(ElementMatchers.named("name")) .intercept(FixedValue.reference(typeInformation.getName())) .method(ElementMatchers.named("get")) - .intercept(new InvokeGetterInstruction(method)); + .intercept(new InvokeGetterInstruction(method, fieldNamePolicy)); } // The list of setters for a class is cached, so we only create the classes the first time @@ -271,9 +280,11 @@ private static void validateJavaBean( private static class InvokeGetterInstruction implements Implementation { // Getter method that wil be invoked private Method method; + private SerializableFunction<String, String> fieldNamePolicy; - InvokeGetterInstruction(Method method) { + InvokeGetterInstruction(Method method, SerializableFunction<String, String> fieldNamePolicy) { this.method = method; + this.fieldNamePolicy = fieldNamePolicy; } @Override @@ -284,7 +295,7 @@ public InstrumentedType prepare(InstrumentedType instrumentedType) { @Override public ByteCodeAppender appender(final Target implementationTarget) { return (methodVisitor, implementationContext, instrumentedMethod) -> { - TypeInformation typeInformation = TypeInformation.forGetter(method); + TypeInformation typeInformation = TypeInformation.forGetter(method, fieldNamePolicy); // this + method parameters. int numLocals = 1 + instrumentedMethod.getParameters().size(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java index 38fa42aa96da..fdf978cfb46b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.ReadableInstant; @@ -85,7 +86,8 @@ public static TypeInformation forField(Field field) { } /** Construct a {@link TypeInformation} from a class getter. */ - public static TypeInformation forGetter(Method method) { + public static TypeInformation forGetter( + Method method, SerializableFunction<String, String> fieldNamePolicy) { String name; if (method.getName().startsWith("get")) { name = ReflectUtils.stripPrefix(method.getName(), "get"); @@ -94,6 +96,8 @@ public static TypeInformation forGetter(Method method) { } else { throw new RuntimeException("Getter has wrong prefix " + method.getName()); } + name = fieldNamePolicy.apply(name); + TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType()); boolean nullable = method.isAnnotationPresent(Nullable.class); return new TypeInformation(name, type, nullable); diff --git a/sdks/java/core/src/test/avro/org/apache/beam/sdk/schemas/test.avsc b/sdks/java/core/src/test/avro/org/apache/beam/sdk/schemas/test.avsc new file mode 100644 index 000000000000..9ed18bf439e3 --- /dev/null +++ b/sdks/java/core/src/test/avro/org/apache/beam/sdk/schemas/test.avsc @@ -0,0 +1,29 @@ +{ + "namespace": "org.apache.beam.sdk.schemas", + "type": "record", + "name": "TestAvro", + "fields": [ + { "name": "bool_non_nullable", "type": "boolean"}, + { "name": "int", "type": ["int", "null"]}, + { "name": "long", "type": ["long", "null"]}, + { "name": "float", "type": ["float", "null"]}, + { "name": "double", "type": ["double", "null"]}, + { "name": "string", "type": ["string", "null"]}, + { "name": "bytes", "type": ["bytes", "null"]}, + { "name": "fixed", "type": {"type": "fixed", "size": 4, "name": "fixed4"} }, + { "name": "timestampMillis", "type": + [ {"type": "long", "logicalType": "timestamp-millis"}, "null"]}, + { "name": "row", "type": ["null", { + "type": "record", + "name": "TestAvroNested", + "fields": [ + { "name": "bool_non_nullable", "type": "boolean"}, + { "name": "int", "type": ["int", "null"]} + ] + }] + }, + { "name": "array", "type":["null", {"type": "array", "items": ["null", "TestAvroNested"] }]}, + { "name": "map", "type": ["null", {"type": "map", "values": ["null", "TestAvroNested"]}]} + ] +} + diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java new file mode 100644 index 000000000000..f7cc64d7b486 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.nio.ByteBuffer; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.util.Utf8; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.DateTime; +import org.junit.Test; + +/** Tests for AVRO schema classes. */ +public class AvroSchemaTest { + private static final Schema SUBSCHEMA = + Schema.builder() + .addField("bool_non_nullable", FieldType.BOOLEAN) + .addNullableField("int", FieldType.INT32) + .build(); + private static final FieldType SUB_TYPE = FieldType.row(SUBSCHEMA).withNullable(true); + + private static final Schema SCHEMA = + Schema.builder() + .addField("bool_non_nullable", FieldType.BOOLEAN) + .addNullableField("int", FieldType.INT32) + .addNullableField("long", FieldType.INT64) + .addNullableField("float", FieldType.FLOAT) + .addNullableField("double", FieldType.DOUBLE) + .addNullableField("string", FieldType.STRING) + .addNullableField("bytes", FieldType.BYTES) + .addField("fixed", FieldType.BYTES.withMetadata("FIXED:4")) + .addNullableField("timestampMillis", FieldType.DATETIME) + .addNullableField("row", SUB_TYPE) + .addNullableField("array", FieldType.array(SUB_TYPE)) + .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE)) + .build(); + + @Test + public void testSpecificRecordSchema() { + assertEquals( + SCHEMA, new AvroSpecificRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class))); + } + + private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4}; + private static final DateTime DATE_TIME = + new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4); + private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42); + private static final TestAvro AVRO_SPECIFIC_RECORD = + new TestAvro( + true, + 43, + 44L, + (float) 44.1, + (double) 44.2, + "mystring", + ByteBuffer.wrap(BYTE_ARRAY), + new fixed4(BYTE_ARRAY), + DATE_TIME, + AVRO_NESTED_SPECIFIC_RECORD, + ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD), + ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", AVRO_NESTED_SPECIFIC_RECORD)); + private static final GenericRecord AVRO_NESTED_GENERIC_RECORD = + new GenericRecordBuilder(TestAvroNested.SCHEMA$) + .set("bool_non_nullable", true) + .set("int", 42) + .build(); + private static final GenericRecord AVRO_GENERIC_RECORD = + new GenericRecordBuilder(TestAvro.SCHEMA$) + .set("bool_non_nullable", true) + .set("int", 43) + .set("long", 44L) + .set("float", (float) 44.1) + .set("double", (double) 44.2) + .set("string", new Utf8("mystring")) + .set("bytes", ByteBuffer.wrap(BYTE_ARRAY)) + .set( + "fixed", + GenericData.get() + .createFixed( + null, BYTE_ARRAY, org.apache.avro.Schema.createFixed("fixed4", "", "", 4))) + .set("timestampMillis", DATE_TIME.getMillis()) + .set("row", AVRO_NESTED_GENERIC_RECORD) + .set("array", ImmutableList.of(AVRO_NESTED_GENERIC_RECORD, AVRO_NESTED_GENERIC_RECORD)) + .set( + "map", + ImmutableMap.of( + new Utf8("k1"), AVRO_NESTED_GENERIC_RECORD, + new Utf8("k2"), AVRO_NESTED_GENERIC_RECORD)) + .build(); + + private static final Row NESTED_ROW = Row.withSchema(SUBSCHEMA).addValues(true, 42).build(); + private static final Row ROW = + Row.withSchema(SCHEMA) + .addValues( + true, + 43, + 44L, + (float) 44.1, + (double) 44.2, + "mystring", + ByteBuffer.wrap(BYTE_ARRAY), + ByteBuffer.wrap(BYTE_ARRAY), + DATE_TIME, + NESTED_ROW, + ImmutableList.of(NESTED_ROW, NESTED_ROW), + ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW)) + .build(); + + @Test + public void testSpecificRecordToRow() { + SerializableFunction<TestAvro, Row> toRow = + new AvroSpecificRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class)); + Row row = toRow.apply(AVRO_SPECIFIC_RECORD); + assertEquals(ROW, toRow.apply(AVRO_SPECIFIC_RECORD)); + } + + @Test + public void testRowToSpecificRecord() { + SerializableFunction<Row, TestAvro> fromRow = + new AvroSpecificRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class)); + assertEquals(AVRO_SPECIFIC_RECORD, fromRow.apply(ROW)); + } + + @Test + public void testGenericRecordToRow() { + SerializableFunction<GenericRecord, Row> toRow = + AvroUtils.getGenericRecordToRowFunction(SCHEMA); + assertEquals(ROW, toRow.apply(AVRO_GENERIC_RECORD)); + } + + @Test + public void testRowToGenericRecord() { + SerializableFunction<Row, GenericRecord> fromRow = + AvroUtils.getRowToGenericRecordFunction(TestAvro.SCHEMA$); + GenericRecord generic = fromRow.apply(ROW); + assertEquals(AVRO_GENERIC_RECORD, fromRow.apply(ROW)); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 13512a0c425b..f6484f1c3e4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.pholser.junit.quickcheck.From; import com.pholser.junit.quickcheck.Property; import com.pholser.junit.quickcheck.runner.JUnitQuickcheck; @@ -32,6 +33,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.function.Function; import org.apache.avro.Conversions; import org.apache.avro.LogicalType; @@ -292,7 +294,7 @@ public void testNullableFieldInAvroSchema() { new org.apache.avro.Schema.Field( "array", org.apache.avro.Schema.createArray( - ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))), + ReflectData.makeNullable(org.apache.avro.Schema.create(Type.BYTES))), "", null)); fields.add( @@ -307,10 +309,26 @@ public void testNullableFieldInAvroSchema() { Schema expectedSchema = Schema.builder() .addNullableField("int", FieldType.INT32) - .addArrayField("array", FieldType.INT32.withNullable(true)) + .addArrayField("array", FieldType.BYTES.withNullable(true)) .addMapField("map", FieldType.STRING, FieldType.INT32.withNullable(true)) .build(); assertEquals(expectedSchema, AvroUtils.toBeamSchema(avroSchema)); + + Map<String, Object> nullMap = Maps.newHashMap(); + nullMap.put("k1", null); + GenericRecord genericRecord = + new GenericRecordBuilder(avroSchema) + .set("int", null) + .set("array", Lists.newArrayList((Object) null)) + .set("map", nullMap) + .build(); + Row expectedRow = + Row.withSchema(expectedSchema) + .addValue(null) + .addValue(Lists.newArrayList((Object) null)) + .addValue(nullMap) + .build(); + assertEquals(expectedRow, AvroUtils.toBeamRowStrict(genericRecord, expectedSchema)); } @Test @@ -342,6 +360,25 @@ public void testNullableFieldsInBeamSchema() { null)); org.apache.avro.Schema avroSchema = org.apache.avro.Schema.createRecord(fields); assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema)); + + Map<Utf8, Object> nullMapUtf8 = Maps.newHashMap(); + nullMapUtf8.put(new Utf8("k1"), null); + Map<String, Object> nullMapString = Maps.newHashMap(); + nullMapString.put("k1", null); + + GenericRecord expectedGenericRecord = + new GenericRecordBuilder(avroSchema) + .set("int", null) + .set("array", Lists.newArrayList((Object) null)) + .set("map", nullMapUtf8) + .build(); + Row row = + Row.withSchema(beamSchema) + .addValue(null) + .addValue(Lists.newArrayList((Object) null)) + .addValue(nullMapString) + .build(); + assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, avroSchema)); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java index 125be49ce4cc..938089911c7b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean; import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; @@ -60,7 +61,8 @@ @Test public void testNullable() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, SerializableFunctions.identity()); assertTrue(schema.getField("str").getType().getNullable()); assertFalse(schema.getField("anInt").getType().getNullable()); } @@ -68,48 +70,62 @@ public void testNullable() { @Test public void testMismatchingNullable() { thrown.expect(RuntimeException.class); - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + MismatchingNullableBean.class, SerializableFunctions.identity()); } @Test public void testSimpleBean() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema); } @Test public void testNestedBean() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema); } @Test public void testPrimitiveArray() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + PrimitiveArrayBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedArray() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + NestedArrayBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema); } @Test public void testNestedCollection() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + NestedCollectionBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, schema); } @Test public void testPrimitiveMap() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + PrimitiveMapBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema); } @Test public void testNestedMap() { - Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class); + Schema schema = + JavaBeanUtils.schemaFromJavaBeanClass( + NestedMapBean.class, SerializableFunctions.identity()); SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema); } @@ -129,7 +145,9 @@ public void testGeneratedSimpleGetters() { simpleBean.setBigDecimal(new BigDecimal(42)); simpleBean.setStringBuilder(new StringBuilder("stringBuilder")); - List<FieldValueGetter> getters = JavaBeanUtils.getGetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA); + List<FieldValueGetter> getters = + JavaBeanUtils.getGetters( + SimpleBean.class, SIMPLE_BEAN_SCHEMA, SerializableFunctions.identity()); assertEquals(12, getters.size()); assertEquals("str", getters.get(0).name()); @@ -198,7 +216,10 @@ public void testGeneratedSimpleBoxedGetters() { bean.setaBoolean(true); List<FieldValueGetter> getters = - JavaBeanUtils.getGetters(BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA); + JavaBeanUtils.getGetters( + BeanWithBoxedFields.class, + BEAN_WITH_BOXED_FIELDS_SCHEMA, + SerializableFunctions.identity()); assertEquals((byte) 41, getters.get(0).get(bean)); assertEquals((short) 42, getters.get(1).get(bean)); assertEquals((int) 43, getters.get(2).get(bean)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 174674) Time Spent: 9h 50m (was: 9h 40m) > Provide automatic schema registration for AVROs > ----------------------------------------------- > > Key: BEAM-4454 > URL: https://issues.apache.org/jira/browse/BEAM-4454 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core > Reporter: Reuven Lax > Assignee: Reuven Lax > Priority: Major > Time Spent: 9h 50m > Remaining Estimate: 0h > > Need to make sure this is a compatible change -- This message was sent by Atlassian JIRA (v7.6.3#76005)