[ 
https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=174674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174674
 ]

ASF GitHub Bot logged work on BEAM-4454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Dec/18 21:16
            Start Date: 12/Dec/18 21:16
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #7233:  [BEAM-4454] Add 
remaining functionality for AVRO schemas
URL: https://github.com/apache/beam/pull/7233
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index fc11ecc390a2..29ef4a6745b3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -121,6 +121,7 @@ rat {
     "**/.github/**/*",
 
     "**/package-list",
+    "**/test.avsc",
     "**/user.avsc",
     "**/test/resources/**/*.txt",
     "**/test/**/.placeholder",
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
new file mode 100644
index 000000000000..fcb85f4dd664
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.util.List;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+
+/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */
+public class AvroSpecificRecordGetterFactory implements 
FieldValueGetterFactory {
+  @Override
+  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
+    return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, 
schema);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
new file mode 100644
index 000000000000..d8e4bda342f8
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.avro.specific.SpecificRecord;
+import 
org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * A {@link SchemaProvider} for AVRO generated SpecificRecords.
+ *
+ * <p>This provider infers a schema from generates SpecificRecord objects, and 
creates schemas and
+ * rows that bind to the appropriate fields.
+ */
+public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider {
+  @Override
+  public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
+    return AvroUtils.getSchema((Class<? extends SpecificRecord>) 
typeDescriptor.getRawType());
+  }
+
+  @Override
+  public FieldValueGetterFactory fieldValueGetterFactory() {
+    return new AvroSpecificRecordGetterFactory();
+  }
+
+  @Override
+  public UserTypeCreatorFactory schemaTypeCreatorFactory() {
+    return new AvroSpecificRecordUserTypeCreatorFactory();
+  }
+
+  @Override
+  public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
+    return new AvroSpecificRecordTypeInformationFactory();
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
new file mode 100644
index 000000000000..68d0d6a7d068
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+
+/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */
+public class AvroSpecificRecordUserTypeCreatorFactory implements 
UserTypeCreatorFactory {
+  @Override
+  public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
+    return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, 
schema);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index af5207275f9b..f7757a8174e7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -17,8 +17,8 @@
  */
 package org.apache.beam.sdk.schemas;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.List;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -34,45 +34,12 @@
   /** Implementing class should override to return a getter factory. */
   abstract FieldValueGetterFactory fieldValueGetterFactory();
 
-  /** Implementing class should override to return a setter factory. */
-  abstract FieldValueSetterFactory fieldValueSetterFactory();
-
   /** Implementing class should override to return a type-information factory. 
*/
   abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory();
 
-  /**
-   * Implementing class should override to return a constructor factory.
-   *
-   * <p>Tne default factory uses the default constructor and the setters to 
construct an object.
-   */
-  UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    Factory<List<FieldValueSetter>> setterFactory = new 
CachingFactory<>(fieldValueSetterFactory());
-    return new UserTypeCreatorFactory() {
-      @Override
-      public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
-        List<FieldValueSetter> setters = setterFactory.create(clazz, schema);
-        return new SchemaUserTypeCreator() {
-          @Override
-          public Object create(Object... params) {
-            Object object;
-            try {
-              object = clazz.getDeclaredConstructor().newInstance();
-            } catch (NoSuchMethodException
-                | IllegalAccessException
-                | InvocationTargetException
-                | InstantiationException e) {
-              throw new RuntimeException("Failed to instantiate object ", e);
-            }
-            for (int i = 0; i < params.length; ++i) {
-              FieldValueSetter setter = setters.get(i);
-              setter.set(object, params[i]);
-            }
-            return object;
-          }
-        };
-      }
-    };
-  }
+  /** Implementing class should override to return a constructor factory. */
+  @Nullable
+  abstract UserTypeCreatorFactory schemaTypeCreatorFactory();
 
   @Override
   public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> 
typeDescriptor) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 1e127b181210..2c22400b050c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -23,6 +23,7 @@
 import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory;
 import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory;
 import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -42,7 +43,8 @@
 public class JavaBeanSchema extends GetterBasedSchemaProvider {
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
+    return JavaBeanUtils.schemaFromJavaBeanClass(
+        typeDescriptor.getRawType(), SerializableFunctions.identity());
   }
 
   @Override
@@ -51,8 +53,8 @@ public FieldValueGetterFactory fieldValueGetterFactory() {
   }
 
   @Override
-  public FieldValueSetterFactory fieldValueSetterFactory() {
-    return new JavaBeanSetterFactory();
+  UserTypeCreatorFactory schemaTypeCreatorFactory() {
+    return new SetterBasedCreatorFactory(new JavaBeanSetterFactory());
   }
 
   @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index c6080e6863e5..a7d6a3020ccd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -21,7 +21,6 @@
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.utils.POJOUtils;
 import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory;
-import org.apache.beam.sdk.schemas.utils.PojoValueSetterFactory;
 import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -50,11 +49,6 @@ public FieldValueGetterFactory fieldValueGetterFactory() {
     return new PojoValueGetterFactory();
   }
 
-  @Override
-  public FieldValueSetterFactory fieldValueSetterFactory() {
-    return new PojoValueSetterFactory();
-  }
-
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
     return new PojoValueTypeInformationFactory();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index d256b326e0f7..2eeecfd1fdaa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -540,6 +540,14 @@ public FieldType withMetadata(String metadata) {
       return 
toBuilder().setMetadata(metadata.getBytes(StandardCharsets.UTF_8)).build();
     }
 
+    public String getMetadataString() {
+      if (getMetadata() != null) {
+        return new String(getMetadata(), StandardCharsets.UTF_8);
+      } else {
+        return "";
+      }
+    }
+
     public FieldType withNullable(boolean nullable) {
       return toBuilder().setNullable(nullable).build();
     }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java
new file mode 100644
index 000000000000..26884b78f634
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SetterBasedCreatorFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+/**
+ * A {@link UserTypeCreatorFactory} that uses a default constructor and a list 
of setters to
+ * construct a class.
+ */
+class SetterBasedCreatorFactory implements UserTypeCreatorFactory {
+  private final Factory<List<FieldValueSetter>> setterFactory;
+
+  public SetterBasedCreatorFactory(Factory<List<FieldValueSetter>> 
setterFactory) {
+    this.setterFactory = new CachingFactory<>(setterFactory);
+  }
+
+  @Override
+  public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
+    List<FieldValueSetter> setters = setterFactory.create(clazz, schema);
+    return new SchemaUserTypeCreator() {
+      @Override
+      public Object create(Object... params) {
+        Object object;
+        try {
+          object = clazz.getDeclaredConstructor().newInstance();
+        } catch (NoSuchMethodException
+            | IllegalAccessException
+            | InvocationTargetException
+            | InstantiationException e) {
+          throw new RuntimeException("Failed to instantiate object ", e);
+        }
+        for (int i = 0; i < params.length; ++i) {
+          FieldValueSetter setter = setters.get(i);
+          setter.set(object, params[i]);
+        }
+        return object;
+      }
+    };
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
new file mode 100644
index 000000000000..a1adb25ae6e2
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroByteBuddyUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
+import net.bytebuddy.dynamic.DynamicType;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodCall;
+import net.bytebuddy.implementation.bytecode.StackManipulation;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
+import net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
+import net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
+import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
+import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+class AvroByteBuddyUtils {
+  private static final ByteBuddy BYTE_BUDDY = new ByteBuddy();
+
+  // Cache the generated constructors.
+  private static final Map<ClassWithSchema, SchemaUserTypeCreator> 
CACHED_CREATORS =
+      Maps.newConcurrentMap();
+
+  static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
+      Class<T> clazz, Schema schema) {
+    return CACHED_CREATORS.computeIfAbsent(
+        new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
+  }
+
+  private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, 
Schema schema) {
+    Constructor baseConstructor = null;
+    Constructor[] constructors = clazz.getDeclaredConstructors();
+    for (Constructor constructor : constructors) {
+      // TODO: This assumes that Avro only generates one constructor with this 
many fields.
+      if (constructor.getParameterCount() == schema.getFieldCount()) {
+        baseConstructor = constructor;
+      }
+    }
+    if (baseConstructor == null) {
+      throw new RuntimeException("No matching constructor found for class " + 
clazz);
+    }
+
+    // Generate a method call to create and invoke the SpecificRecord's 
constructor. .
+    MethodCall construct = MethodCall.construct(baseConstructor);
+    for (int i = 0; i < baseConstructor.getParameterTypes().length; ++i) {
+      Class<?> baseType = baseConstructor.getParameterTypes()[i];
+      construct = construct.with(readAndConvertParameter(baseType, i), 
baseType);
+    }
+
+    try {
+      DynamicType.Builder<SchemaUserTypeCreator> builder =
+          BYTE_BUDDY
+              .subclass(SchemaUserTypeCreator.class)
+              .method(ElementMatchers.named("create"))
+              .intercept(construct);
+
+      return builder
+          .make()
+          .load(ReflectHelpers.findClassLoader(), 
ClassLoadingStrategy.Default.INJECTION)
+          .getLoaded()
+          .getDeclaredConstructor()
+          .newInstance();
+    } catch (InstantiationException
+        | IllegalAccessException
+        | NoSuchMethodException
+        | InvocationTargetException e) {
+      throw new RuntimeException(
+          "Unable to generate a getter for class " + clazz + " with schema " + 
schema);
+    }
+  }
+
+  private static StackManipulation readAndConvertParameter(
+      Class<?> constructorParameterType, int index) {
+    // The types in the AVRO-generated constructor might be the types returned 
by Beam's Row class,
+    // so we have to convert the types used by Beam's Row class.
+    // We know that AVRO generates constructor parameters in the same order as 
fields
+    // in the schema, so we can just add the parameters sequentially.
+    ConvertType convertType = new ConvertType(true);
+
+    // Map the AVRO-generated type to the one Beam will use.
+    ForLoadedType convertedType =
+        new ForLoadedType((Class) 
convertType.convert(TypeDescriptor.of(constructorParameterType)));
+
+    // This will run inside the generated creator. Read the parameter and 
convert it to the
+    // type required by the SpecificRecord constructor.
+    StackManipulation readParameter =
+        new StackManipulation.Compound(
+            MethodVariableAccess.REFERENCE.loadFrom(1),
+            IntegerConstant.forValue(index),
+            ArrayAccess.REFERENCE.load(),
+            TypeCasting.to(convertedType));
+
+    // Convert to the parameter accepted by the SpecificRecord constructor.
+    return new ByteBuddyUtils.ConvertValueForSetter(readParameter)
+        .convert(TypeDescriptor.of(constructorParameterType));
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
new file mode 100644
index 000000000000..6b76fa6fa66a
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.utils;
+
+import java.util.List;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
+import org.apache.beam.sdk.schemas.Schema;
+
+/** A {@link FieldValueTypeInformation} for AVRO-generated specific records. */
+public class AvroSpecificRecordTypeInformationFactory implements 
FieldValueTypeInformationFactory {
+  @Override
+  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
+    return AvroUtils.getFieldTypes((Class<? extends SpecificRecord>) 
targetClass, schema);
+  }
+}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 8b9f182d0848..df33d6430389 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -20,8 +20,9 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.base.CaseFormat;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -35,17 +36,25 @@
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.data.TimeConversions;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.schemas.FieldValueGetter;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
@@ -53,6 +62,13 @@
 /** Utils to convert AVRO records to Beam rows. */
 @Experimental(Experimental.Kind.SCHEMAS)
 public class AvroUtils {
+  static {
+    // This works around a bug in the Avro library (AVRO-1891) around 
SpecificRecord's handling
+    // of DateTime types.
+    SpecificData.get().addLogicalTypeConversion(new 
TimeConversions.TimestampConversion());
+    GenericData.get().addLogicalTypeConversion(new 
TimeConversions.TimestampConversion());
+  }
+
   // Unwrap an AVRO schema into the base type an whether it is nullable.
   static class TypeWithNullability {
     public final org.apache.avro.Schema type;
@@ -91,6 +107,53 @@
     }
   }
 
+  /** Wrapper for fixed byte fields. */
+  public static class FixedBytesField {
+    private static final String PREFIX = "FIXED:";
+
+    private final int size;
+
+    private FixedBytesField(int size) {
+      this.size = size;
+    }
+
+    /** Create a {@link FixedBytesField} from a Beam {@link FieldType}. */
+    @Nullable
+    public static FixedBytesField fromBeamFieldType(FieldType fieldType) {
+      String metadata = fieldType.getMetadataString();
+      if (fieldType.getTypeName().equals(TypeName.BYTES) && 
metadata.startsWith(PREFIX)) {
+        return new FixedBytesField(Integer.parseInt(metadata.substring(6)));
+      } else {
+        return null;
+      }
+    }
+
+    /** Create a {@link FixedBytesField} from an AVRO type. */
+    @Nullable
+    public static FixedBytesField fromAvroType(org.apache.avro.Schema type) {
+      if (type.getType().equals(Type.FIXED)) {
+        return new FixedBytesField(type.getFixedSize());
+      } else {
+        return null;
+      }
+    }
+
+    /** Get the size. */
+    public int getSize() {
+      return size;
+    }
+
+    /** Convert to a Beam type. */
+    public FieldType toBeamType() {
+      return Schema.FieldType.BYTES.withMetadata(PREFIX + 
Integer.toString(size));
+    }
+
+    /** Convert to an AVRO type. */
+    public org.apache.avro.Schema toAvroType() {
+      return org.apache.avro.Schema.createFixed(null, "", "", size);
+    }
+  }
+
   private AvroUtils() {}
 
   /**
@@ -142,12 +205,7 @@ public static Row toBeamRowStrict(GenericRecord record, 
@Nullable Schema schema)
     for (Schema.Field field : schema.getFields()) {
       Object value = record.get(field.getName());
       org.apache.avro.Schema fieldAvroSchema = 
avroSchema.getField(field.getName()).schema();
-
-      if (value == null) {
-        builder.addValue(null);
-      } else {
-        builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, 
field.getType()));
-      }
+      builder.addValue(convertAvroFieldStrict(value, fieldAvroSchema, 
field.getType()));
     }
 
     return builder.build();
@@ -184,6 +242,79 @@ public static GenericRecord toGenericRecord(
     return builder.build();
   }
 
+  /**
+   * Returns a function mapping AVRO {@link GenericRecord}s to Beam {@link 
Row}s for use in {@link
+   * org.apache.beam.sdk.values.PCollection#setSchema}.
+   */
+  public static SerializableFunction<GenericRecord, Row> 
getGenericRecordToRowFunction(
+      @Nullable Schema schema) {
+    return g -> toBeamRowStrict(g, schema);
+  }
+
+  /**
+   * Returns a function mapping Beam {@link Row}s to AVRO {@link 
GenericRecord}s for use in {@link
+   * org.apache.beam.sdk.values.PCollection#setSchema}.
+   */
+  public static SerializableFunction<Row, GenericRecord> 
getRowToGenericRecordFunction(
+      @Nullable org.apache.avro.Schema avroSchema) {
+    return g -> toGenericRecord(g, avroSchema);
+  }
+
+  /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */
+  public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) {
+    try {
+      org.apache.avro.Schema avroSchema =
+          (org.apache.avro.Schema) 
(clazz.getDeclaredField("SCHEMA$").get(null));
+      return toBeamSchema(avroSchema);
+    } catch (NoSuchFieldException | IllegalAccessException e) {
+      throw new IllegalArgumentException(
+          "Class "
+              + clazz
+              + " is not an AVRO SpecificRecord. "
+              + "No public SCHEMA$ field was found.");
+    }
+  }
+
+  private static final class AvroSpecificRecordFieldNamePolicy
+      implements SerializableFunction<String, String> {
+    Schema schema;
+    Map<String, String> nameMapping = Maps.newHashMap();
+
+    AvroSpecificRecordFieldNamePolicy(Schema schema) {
+      this.schema = schema;
+      for (Field field : schema.getFields()) {
+        String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, 
field.getName());
+        nameMapping.put(getter, field.getName());
+        // The Avro compiler might add a $ at the end of a getter to 
disambiguate.
+        nameMapping.put(getter + "$", field.getName());
+      }
+    }
+
+    @Override
+    public String apply(String input) {
+      return nameMapping.getOrDefault(input, input);
+    }
+  }
+
+  /** Get field types for an AVRO-generated SpecificRecord. */
+  public static <T extends SpecificRecord> List<FieldValueTypeInformation> 
getFieldTypes(
+      Class<T> clazz, Schema schema) {
+    return JavaBeanUtils.getFieldTypes(
+        clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema));
+  }
+
+  /** Get generated getters for an AVRO-generated SpecificRecord. */
+  public static <T extends SpecificRecord> List<FieldValueGetter> getGetters(
+      Class<T> clazz, Schema schema) {
+    return JavaBeanUtils.getGetters(clazz, schema, new 
AvroSpecificRecordFieldNamePolicy(schema));
+  }
+
+  /** Get an object creator for an AVRO-generated SpecificRecord. */
+  public static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
+      Class<T> clazz, Schema schema) {
+    return AvroByteBuddyUtils.getCreator(clazz, schema);
+  }
+
   /** Converts AVRO schema to Beam field. */
   private static Schema.FieldType toFieldType(TypeWithNullability type) {
     Schema.FieldType fieldType = null;
@@ -224,7 +355,7 @@ public static GenericRecord toGenericRecord(
           break;
 
         case FIXED:
-          fieldType = Schema.FieldType.BYTES;
+          fieldType = FixedBytesField.fromAvroType(type.type).toBeamType();
           break;
 
         case STRING:
@@ -312,7 +443,12 @@ public static GenericRecord toGenericRecord(
         break;
 
       case BYTES:
-        baseType = org.apache.avro.Schema.create(Type.BYTES);
+        FixedBytesField fixedBytesField = 
FixedBytesField.fromBeamFieldType(fieldType);
+        if (fixedBytesField != null) {
+          baseType = fixedBytesField.toAvroType();
+        } else {
+          baseType = org.apache.avro.Schema.create(Type.BYTES);
+        }
         break;
 
       case ARRAY:
@@ -340,9 +476,24 @@ public static GenericRecord toGenericRecord(
     return fieldType.getNullable() ? ReflectData.makeNullable(baseType) : 
baseType;
   }
 
+  @Nullable
   private static Object genericFromBeamField(
-      Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, Object 
value) {
-    org.apache.avro.Schema expectedSchema = getFieldSchema(fieldType);
+      Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable 
Object value) {
+    TypeWithNullability typeWithNullability = new 
TypeWithNullability(avroSchema);
+
+    if (!fieldType.getNullable().equals(typeWithNullability.nullable)) {
+      throw new IllegalArgumentException(
+          "FieldType "
+              + fieldType
+              + " and AVRO schema "
+              + avroSchema
+              + " don't have matching nullability");
+    }
+
+    if (value == null) {
+      return value;
+    }
+
     switch (fieldType.getTypeName()) {
       case BYTE:
       case INT16:
@@ -351,81 +502,67 @@ private static Object genericFromBeamField(
       case FLOAT:
       case DOUBLE:
       case BOOLEAN:
-        return checkValueType(avroSchema, value, fieldType, expectedSchema);
+        return value;
 
       case STRING:
         return new Utf8((String) value);
 
       case DECIMAL:
         BigDecimal decimal = (BigDecimal) value;
-        LogicalType logicalType = avroSchema.getLogicalType();
-        ByteBuffer byteBuffer =
-            new Conversions.DecimalConversion().toBytes(decimal, null, 
logicalType);
-        return checkValueType(avroSchema, byteBuffer, fieldType, 
expectedSchema);
+        LogicalType logicalType = typeWithNullability.type.getLogicalType();
+        return new Conversions.DecimalConversion().toBytes(decimal, null, 
logicalType);
 
       case DATETIME:
         ReadableInstant instant = (ReadableInstant) value;
-        return checkValueType(avroSchema, instant.getMillis(), fieldType, 
expectedSchema);
+        return instant.getMillis();
 
       case BYTES:
-        return checkValueType(
-            avroSchema, ByteBuffer.wrap((byte[]) value), fieldType, 
expectedSchema);
+        FixedBytesField fixedBytesField = 
FixedBytesField.fromBeamFieldType(fieldType);
+        if (fixedBytesField != null) {
+          byte[] byteArray = (byte[]) value;
+          if (byteArray.length != fixedBytesField.getSize()) {
+            throw new IllegalArgumentException("Incorrectly sized byte 
array.");
+          }
+          return GenericData.get().createFixed(null, (byte[]) value, 
typeWithNullability.type);
+        } else {
+          return ByteBuffer.wrap((byte[]) value);
+        }
 
       case ARRAY:
-        List array = (List) checkValueType(avroSchema, value, fieldType, 
expectedSchema);
+        List array = (List) value;
         List<Object> translatedArray = 
Lists.newArrayListWithExpectedSize(array.size());
-        org.apache.avro.Schema avroArrayType = new 
TypeWithNullability(avroSchema).type;
 
         for (Object arrayElement : array) {
           translatedArray.add(
               genericFromBeamField(
                   fieldType.getCollectionElementType(),
-                  avroArrayType.getElementType(),
+                  typeWithNullability.type.getElementType(),
                   arrayElement));
         }
-        return checkValueType(avroSchema, translatedArray, fieldType, 
expectedSchema);
+        return translatedArray;
 
       case MAP:
-        ImmutableMap.Builder builder = ImmutableMap.builder();
-        Map<Object, Object> valueMap =
-            (Map<Object, Object>) checkValueType(avroSchema, value, fieldType, 
expectedSchema);
-        org.apache.avro.Schema avroMapType = new 
TypeWithNullability(avroSchema).type;
-
+        Map map = Maps.newHashMap();
+        Map<Object, Object> valueMap = (Map<Object, Object>) value;
         for (Map.Entry entry : valueMap.entrySet()) {
           Utf8 key = new Utf8((String) entry.getKey());
-          builder.put(
+          map.put(
               key,
               genericFromBeamField(
-                  fieldType.getMapValueType(), avroMapType.getValueType(), 
entry.getValue()));
+                  fieldType.getMapValueType(),
+                  typeWithNullability.type.getValueType(),
+                  entry.getValue()));
         }
-        return checkValueType(avroSchema, builder.build(), fieldType, 
expectedSchema);
+        return map;
 
       case ROW:
-        return checkValueType(
-            avroSchema, toGenericRecord((Row) value, avroSchema), fieldType, 
expectedSchema);
+        return toGenericRecord((Row) value, typeWithNullability.type);
 
       default:
         throw new IllegalArgumentException("Unsupported type " + fieldType);
     }
   }
 
-  private static Object checkValueType(
-      org.apache.avro.Schema avroSchema,
-      Object o,
-      FieldType fieldType,
-      org.apache.avro.Schema expectedType) {
-    TypeWithNullability typeWithNullability = new 
TypeWithNullability(avroSchema);
-    if (!fieldType.getNullable().equals(typeWithNullability.nullable)) {
-      throw new IllegalArgumentException(
-          "FieldType "
-              + fieldType
-              + " and AVRO schema "
-              + avroSchema
-              + " don't have matching nullability");
-    }
-    return o;
-  }
-
   /**
    * Strict conversion from AVRO to Beam, strict because it doesn't do 
widening or narrowing during
    * conversion.
@@ -436,10 +573,14 @@ private static Object checkValueType(
    * @return value converted for {@link Row}
    */
   @SuppressWarnings("unchecked")
+  @Nullable
   public static Object convertAvroFieldStrict(
-      @Nonnull Object value,
+      @Nullable Object value,
       @Nonnull org.apache.avro.Schema avroSchema,
       @Nonnull Schema.FieldType fieldType) {
+    if (value == null) {
+      return null;
+    }
 
     TypeWithNullability type = new TypeWithNullability(avroSchema);
     LogicalType logicalType = LogicalTypes.fromSchema(type.type);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
index accbb20dbf7b..df910bac97d4 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ByteBuddyUtils.java
@@ -38,6 +38,7 @@
 import net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
 import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
 import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -96,6 +97,9 @@ public T convert(TypeDescriptor typeDescriptor) {
         return convertDateTime(typeDescriptor);
       } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
         return convertByteBuffer(typeDescriptor);
+      } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
+        // TODO: Refactor AVRO-specific check into separate class.
+        return convertGenericFixed(typeDescriptor);
       } else if 
(typeDescriptor.isSubtypeOf(TypeDescriptor.of(CharSequence.class))) {
         return convertCharSequence(typeDescriptor);
       } else if (typeDescriptor.getRawType().isPrimitive()) {
@@ -115,6 +119,8 @@ public T convert(TypeDescriptor typeDescriptor) {
 
     protected abstract T convertByteBuffer(TypeDescriptor<?> type);
 
+    protected abstract T convertGenericFixed(TypeDescriptor<?> type);
+
     protected abstract T convertCharSequence(TypeDescriptor<?> type);
 
     protected abstract T convertPrimitive(TypeDescriptor<?> type);
@@ -173,6 +179,11 @@ protected Type convertByteBuffer(TypeDescriptor<?> type) {
       return byte[].class;
     }
 
+    @Override
+    protected Type convertGenericFixed(TypeDescriptor<?> type) {
+      return byte[].class;
+    }
+
     @Override
     protected Type convertCharSequence(TypeDescriptor<?> type) {
       return String.class;
@@ -304,6 +315,23 @@ protected StackManipulation 
convertByteBuffer(TypeDescriptor<?> type) {
                   .getOnly()));
     }
 
+    @Override
+    protected StackManipulation convertGenericFixed(TypeDescriptor<?> type) {
+      // TODO: Refactor AVRO-specific code into separate class.
+
+      // Generate the following code:
+      // return value.bytes();
+
+      return new Compound(
+          readValue,
+          MethodInvocation.invoke(
+              new ForLoadedType(GenericFixed.class)
+                  .getDeclaredMethods()
+                  .filter(
+                      
ElementMatchers.named("bytes").and(ElementMatchers.returns(BYTE_ARRAY_TYPE)))
+                  .getOnly()));
+    }
+
     @Override
     protected StackManipulation convertCharSequence(TypeDescriptor<?> type) {
       // If the member is a String, then return it.
@@ -464,6 +492,28 @@ protected StackManipulation 
convertByteBuffer(TypeDescriptor<?> type) {
                   .getOnly()));
     }
 
+    @Override
+    protected StackManipulation convertGenericFixed(TypeDescriptor<?> type) {
+      // Generate the following code:
+      // return T((byte[]) value);
+
+      // TODO: Refactor AVRO-specific code out of this class.
+      ForLoadedType loadedType = new ForLoadedType(type.getRawType());
+      return new Compound(
+          TypeCreation.of(loadedType),
+          Duplication.SINGLE,
+          readValue,
+          TypeCasting.to(BYTE_ARRAY_TYPE),
+          // Create a new instance that wraps this byte[].
+          MethodInvocation.invoke(
+              loadedType
+                  .getDeclaredMethods()
+                  .filter(
+                      ElementMatchers.isConstructor()
+                          
.and(ElementMatchers.takesArguments(BYTE_ARRAY_TYPE)))
+                  .getOnly()));
+    }
+
     @Override
     protected StackManipulation convertCharSequence(TypeDescriptor<?> type) {
       // If the type is a String, just return it.
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
index 0bb9e995cd71..5c3d2ea17155 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
@@ -21,11 +21,12 @@
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 
 /** A factory for creating {@link FieldValueGetter} objects for a JavaBean 
object. */
 public class JavaBeanGetterFactory implements FieldValueGetterFactory {
   @Override
   public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return JavaBeanUtils.getGetters(targetClass, schema);
+    return JavaBeanUtils.getGetters(targetClass, schema, 
SerializableFunctions.identity());
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
index 1b3826227c8a..f445a87dc9ca 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
@@ -21,11 +21,12 @@
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 
 /** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */
 public class JavaBeanTypeInformationFactory implements 
FieldValueTypeInformationFactory {
   @Override
   public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return JavaBeanUtils.getFieldTypes(targetClass, schema);
+    return JavaBeanUtils.getFieldTypes(targetClass, schema, 
SerializableFunctions.identity());
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 8a213b436eba..0ec07c9d8d77 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -49,23 +49,27 @@
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
 import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /** A set of utilities to generate getter and setter classes for JavaBean 
objects. */
 @Experimental(Kind.SCHEMAS)
 public class JavaBeanUtils {
   /** Create a {@link Schema} for a Java Bean class. */
-  public static Schema schemaFromJavaBeanClass(Class<?> clazz) {
-    return StaticSchemaInference.schemaFromClass(clazz, 
JavaBeanUtils::typeInformationFromClass);
+  public static Schema schemaFromJavaBeanClass(
+      Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) {
+    return StaticSchemaInference.schemaFromClass(
+        clazz, c -> JavaBeanUtils.typeInformationFromClass(c, 
fieldNamePolicy));
   }
 
-  private static List<TypeInformation> typeInformationFromClass(Class<?> 
clazz) {
+  private static List<TypeInformation> typeInformationFromClass(
+      Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) {
     try {
       List<TypeInformation> getterTypes =
           ReflectUtils.getMethods(clazz)
               .stream()
               .filter(ReflectUtils::isGetter)
-              .map(m -> TypeInformation.forGetter(m))
+              .map(m -> TypeInformation.forGetter(m, fieldNamePolicy))
               .collect(Collectors.toList());
 
       Map<String, TypeInformation> setterTypes =
@@ -113,7 +117,8 @@ private static void validateJavaBean(
   private static final Map<ClassWithSchema, List<FieldValueTypeInformation>> 
CACHED_FIELD_TYPES =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz, 
Schema schema) {
+  public static List<FieldValueTypeInformation> getFieldTypes(
+      Class<?> clazz, Schema schema, SerializableFunction<String, String> 
fieldNamePolicy) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
@@ -122,7 +127,7 @@ private static void validateJavaBean(
                 ReflectUtils.getMethods(clazz)
                     .stream()
                     .filter(ReflectUtils::isGetter)
-                    .map(TypeInformation::forGetter)
+                    .map(m -> TypeInformation.forGetter(m, fieldNamePolicy))
                     .map(FieldValueTypeInformation::of)
                     .collect(
                         Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
@@ -147,7 +152,8 @@ private static void validateJavaBean(
    *
    * <p>The returned list is ordered by the order of fields in the schema.
    */
-  public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueGetter> getGetters(
+      Class<?> clazz, Schema schema, SerializableFunction<String, String> 
fieldNamePolicy) {
     return CACHED_GETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
@@ -156,7 +162,7 @@ private static void validateJavaBean(
                 ReflectUtils.getMethods(clazz)
                     .stream()
                     .filter(ReflectUtils::isGetter)
-                    .map(JavaBeanUtils::createGetter)
+                    .map(m -> JavaBeanUtils.createGetter(m, fieldNamePolicy))
                     .collect(Collectors.toMap(FieldValueGetter::name, 
Function.identity()));
             return schema
                 .getFields()
@@ -169,14 +175,15 @@ private static void validateJavaBean(
         });
   }
 
-  private static <T> FieldValueGetter createGetter(Method getterMethod) {
-    TypeInformation typeInformation = TypeInformation.forGetter(getterMethod);
+  private static <T> FieldValueGetter createGetter(
+      Method getterMethod, SerializableFunction<String, String> 
fieldNamePolicy) {
+    TypeInformation typeInformation = TypeInformation.forGetter(getterMethod, 
fieldNamePolicy);
     DynamicType.Builder<FieldValueGetter> builder =
         ByteBuddyUtils.subclassGetterInterface(
             BYTE_BUDDY,
             getterMethod.getDeclaringClass(),
             new ConvertType(false).convert(typeInformation.getType()));
-    builder = implementGetterMethods(builder, getterMethod);
+    builder = implementGetterMethods(builder, getterMethod, fieldNamePolicy);
     try {
       return builder
           .make()
@@ -193,13 +200,15 @@ private static void validateJavaBean(
   }
 
   private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
-      DynamicType.Builder<FieldValueGetter> builder, Method method) {
-    TypeInformation typeInformation = TypeInformation.forGetter(method);
+      DynamicType.Builder<FieldValueGetter> builder,
+      Method method,
+      SerializableFunction<String, String> fieldNamePolicy) {
+    TypeInformation typeInformation = TypeInformation.forGetter(method, 
fieldNamePolicy);
     return builder
         .method(ElementMatchers.named("name"))
         .intercept(FixedValue.reference(typeInformation.getName()))
         .method(ElementMatchers.named("get"))
-        .intercept(new InvokeGetterInstruction(method));
+        .intercept(new InvokeGetterInstruction(method, fieldNamePolicy));
   }
 
   // The list of setters for a class is cached, so we only create the classes 
the first time
@@ -271,9 +280,11 @@ private static void validateJavaBean(
   private static class InvokeGetterInstruction implements Implementation {
     // Getter method that wil be invoked
     private Method method;
+    private SerializableFunction<String, String> fieldNamePolicy;
 
-    InvokeGetterInstruction(Method method) {
+    InvokeGetterInstruction(Method method, SerializableFunction<String, 
String> fieldNamePolicy) {
       this.method = method;
+      this.fieldNamePolicy = fieldNamePolicy;
     }
 
     @Override
@@ -284,7 +295,7 @@ public InstrumentedType prepare(InstrumentedType 
instrumentedType) {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return (methodVisitor, implementationContext, instrumentedMethod) -> {
-        TypeInformation typeInformation = TypeInformation.forGetter(method);
+        TypeInformation typeInformation = TypeInformation.forGetter(method, 
fieldNamePolicy);
         // this + method parameters.
         int numLocals = 1 + instrumentedMethod.getParameters().size();
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index 38fa42aa96da..fdf978cfb46b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -34,6 +34,7 @@
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.ReadableInstant;
 
@@ -85,7 +86,8 @@ public static TypeInformation forField(Field field) {
     }
 
     /** Construct a {@link TypeInformation} from a class getter. */
-    public static TypeInformation forGetter(Method method) {
+    public static TypeInformation forGetter(
+        Method method, SerializableFunction<String, String> fieldNamePolicy) {
       String name;
       if (method.getName().startsWith("get")) {
         name = ReflectUtils.stripPrefix(method.getName(), "get");
@@ -94,6 +96,8 @@ public static TypeInformation forGetter(Method method) {
       } else {
         throw new RuntimeException("Getter has wrong prefix " + 
method.getName());
       }
+      name = fieldNamePolicy.apply(name);
+
       TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType());
       boolean nullable = method.isAnnotationPresent(Nullable.class);
       return new TypeInformation(name, type, nullable);
diff --git a/sdks/java/core/src/test/avro/org/apache/beam/sdk/schemas/test.avsc 
b/sdks/java/core/src/test/avro/org/apache/beam/sdk/schemas/test.avsc
new file mode 100644
index 000000000000..9ed18bf439e3
--- /dev/null
+++ b/sdks/java/core/src/test/avro/org/apache/beam/sdk/schemas/test.avsc
@@ -0,0 +1,29 @@
+{
+  "namespace": "org.apache.beam.sdk.schemas",
+  "type": "record",
+  "name": "TestAvro",
+  "fields": [
+    { "name": "bool_non_nullable", "type": "boolean"},
+    { "name": "int", "type": ["int", "null"]},
+    { "name": "long", "type": ["long", "null"]},
+    { "name": "float", "type": ["float", "null"]},
+    { "name": "double", "type": ["double", "null"]},
+    { "name": "string", "type": ["string", "null"]},
+    { "name": "bytes", "type": ["bytes", "null"]},
+    { "name": "fixed", "type": {"type": "fixed", "size": 4, "name": "fixed4"} 
},
+    { "name": "timestampMillis", "type":
+      [ {"type": "long", "logicalType": "timestamp-millis"}, "null"]},
+    { "name": "row", "type": ["null", {
+     "type": "record",
+     "name": "TestAvroNested",
+      "fields": [
+        { "name": "bool_non_nullable", "type": "boolean"},
+        { "name": "int", "type": ["int", "null"]}
+         ]
+       }]
+    },
+    { "name": "array", "type":["null", {"type": "array", "items": ["null", 
"TestAvroNested"] }]},
+    { "name": "map", "type": ["null", {"type": "map", "values": ["null", 
"TestAvroNested"]}]}
+  ]
+}
+
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
new file mode 100644
index 000000000000..f7cc64d7b486
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.nio.ByteBuffer;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.util.Utf8;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+/** Tests for AVRO schema classes. */
+public class AvroSchemaTest {
+  private static final Schema SUBSCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .build();
+  private static final FieldType SUB_TYPE = 
FieldType.row(SUBSCHEMA).withNullable(true);
+
+  private static final Schema SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FieldType.BYTES.withMetadata("FIXED:4"))
+          .addNullableField("timestampMillis", FieldType.DATETIME)
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", FieldType.array(SUB_TYPE))
+          .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE))
+          .build();
+
+  @Test
+  public void testSpecificRecordSchema() {
+    assertEquals(
+        SCHEMA, new 
AvroSpecificRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class)));
+  }
+
+  private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
+  private static final DateTime DATE_TIME =
+      new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4);
+  private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new 
TestAvroNested(true, 42);
+  private static final TestAvro AVRO_SPECIFIC_RECORD =
+      new TestAvro(
+          true,
+          43,
+          44L,
+          (float) 44.1,
+          (double) 44.2,
+          "mystring",
+          ByteBuffer.wrap(BYTE_ARRAY),
+          new fixed4(BYTE_ARRAY),
+          DATE_TIME,
+          AVRO_NESTED_SPECIFIC_RECORD,
+          ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, 
AVRO_NESTED_SPECIFIC_RECORD),
+          ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", 
AVRO_NESTED_SPECIFIC_RECORD));
+  private static final GenericRecord AVRO_NESTED_GENERIC_RECORD =
+      new GenericRecordBuilder(TestAvroNested.SCHEMA$)
+          .set("bool_non_nullable", true)
+          .set("int", 42)
+          .build();
+  private static final GenericRecord AVRO_GENERIC_RECORD =
+      new GenericRecordBuilder(TestAvro.SCHEMA$)
+          .set("bool_non_nullable", true)
+          .set("int", 43)
+          .set("long", 44L)
+          .set("float", (float) 44.1)
+          .set("double", (double) 44.2)
+          .set("string", new Utf8("mystring"))
+          .set("bytes", ByteBuffer.wrap(BYTE_ARRAY))
+          .set(
+              "fixed",
+              GenericData.get()
+                  .createFixed(
+                      null, BYTE_ARRAY, 
org.apache.avro.Schema.createFixed("fixed4", "", "", 4)))
+          .set("timestampMillis", DATE_TIME.getMillis())
+          .set("row", AVRO_NESTED_GENERIC_RECORD)
+          .set("array", ImmutableList.of(AVRO_NESTED_GENERIC_RECORD, 
AVRO_NESTED_GENERIC_RECORD))
+          .set(
+              "map",
+              ImmutableMap.of(
+                  new Utf8("k1"), AVRO_NESTED_GENERIC_RECORD,
+                  new Utf8("k2"), AVRO_NESTED_GENERIC_RECORD))
+          .build();
+
+  private static final Row NESTED_ROW = 
Row.withSchema(SUBSCHEMA).addValues(true, 42).build();
+  private static final Row ROW =
+      Row.withSchema(SCHEMA)
+          .addValues(
+              true,
+              43,
+              44L,
+              (float) 44.1,
+              (double) 44.2,
+              "mystring",
+              ByteBuffer.wrap(BYTE_ARRAY),
+              ByteBuffer.wrap(BYTE_ARRAY),
+              DATE_TIME,
+              NESTED_ROW,
+              ImmutableList.of(NESTED_ROW, NESTED_ROW),
+              ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW))
+          .build();
+
+  @Test
+  public void testSpecificRecordToRow() {
+    SerializableFunction<TestAvro, Row> toRow =
+        new 
AvroSpecificRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class));
+    Row row = toRow.apply(AVRO_SPECIFIC_RECORD);
+    assertEquals(ROW, toRow.apply(AVRO_SPECIFIC_RECORD));
+  }
+
+  @Test
+  public void testRowToSpecificRecord() {
+    SerializableFunction<Row, TestAvro> fromRow =
+        new 
AvroSpecificRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class));
+    assertEquals(AVRO_SPECIFIC_RECORD, fromRow.apply(ROW));
+  }
+
+  @Test
+  public void testGenericRecordToRow() {
+    SerializableFunction<GenericRecord, Row> toRow =
+        AvroUtils.getGenericRecordToRowFunction(SCHEMA);
+    assertEquals(ROW, toRow.apply(AVRO_GENERIC_RECORD));
+  }
+
+  @Test
+  public void testRowToGenericRecord() {
+    SerializableFunction<Row, GenericRecord> fromRow =
+        AvroUtils.getRowToGenericRecordFunction(TestAvro.SCHEMA$);
+    GenericRecord generic = fromRow.apply(ROW);
+    assertEquals(AVRO_GENERIC_RECORD, fromRow.apply(ROW));
+  }
+}
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index 13512a0c425b..f6484f1c3e4c 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.pholser.junit.quickcheck.From;
 import com.pholser.junit.quickcheck.Property;
 import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
@@ -32,6 +33,7 @@
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 import org.apache.avro.Conversions;
 import org.apache.avro.LogicalType;
@@ -292,7 +294,7 @@ public void testNullableFieldInAvroSchema() {
         new org.apache.avro.Schema.Field(
             "array",
             org.apache.avro.Schema.createArray(
-                
ReflectData.makeNullable(org.apache.avro.Schema.create(Type.INT))),
+                
ReflectData.makeNullable(org.apache.avro.Schema.create(Type.BYTES))),
             "",
             null));
     fields.add(
@@ -307,10 +309,26 @@ public void testNullableFieldInAvroSchema() {
     Schema expectedSchema =
         Schema.builder()
             .addNullableField("int", FieldType.INT32)
-            .addArrayField("array", FieldType.INT32.withNullable(true))
+            .addArrayField("array", FieldType.BYTES.withNullable(true))
             .addMapField("map", FieldType.STRING, 
FieldType.INT32.withNullable(true))
             .build();
     assertEquals(expectedSchema, AvroUtils.toBeamSchema(avroSchema));
+
+    Map<String, Object> nullMap = Maps.newHashMap();
+    nullMap.put("k1", null);
+    GenericRecord genericRecord =
+        new GenericRecordBuilder(avroSchema)
+            .set("int", null)
+            .set("array", Lists.newArrayList((Object) null))
+            .set("map", nullMap)
+            .build();
+    Row expectedRow =
+        Row.withSchema(expectedSchema)
+            .addValue(null)
+            .addValue(Lists.newArrayList((Object) null))
+            .addValue(nullMap)
+            .build();
+    assertEquals(expectedRow, AvroUtils.toBeamRowStrict(genericRecord, 
expectedSchema));
   }
 
   @Test
@@ -342,6 +360,25 @@ public void testNullableFieldsInBeamSchema() {
             null));
     org.apache.avro.Schema avroSchema = 
org.apache.avro.Schema.createRecord(fields);
     assertEquals(avroSchema, AvroUtils.toAvroSchema(beamSchema));
+
+    Map<Utf8, Object> nullMapUtf8 = Maps.newHashMap();
+    nullMapUtf8.put(new Utf8("k1"), null);
+    Map<String, Object> nullMapString = Maps.newHashMap();
+    nullMapString.put("k1", null);
+
+    GenericRecord expectedGenericRecord =
+        new GenericRecordBuilder(avroSchema)
+            .set("int", null)
+            .set("array", Lists.newArrayList((Object) null))
+            .set("map", nullMapUtf8)
+            .build();
+    Row row =
+        Row.withSchema(beamSchema)
+            .addValue(null)
+            .addValue(Lists.newArrayList((Object) null))
+            .addValue(nullMapString)
+            .build();
+    assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, 
avroSchema));
   }
 
   @Test
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index 125be49ce4cc..938089911c7b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -49,6 +49,7 @@
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.joda.time.DateTime;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,7 +61,8 @@
 
   @Test
   public void testNullable() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, 
SerializableFunctions.identity());
     assertTrue(schema.getField("str").getType().getNullable());
     assertFalse(schema.getField("anInt").getType().getNullable());
   }
@@ -68,48 +70,62 @@ public void testNullable() {
   @Test
   public void testMismatchingNullable() {
     thrown.expect(RuntimeException.class);
-    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            MismatchingNullableBean.class, SerializableFunctions.identity());
   }
 
   @Test
   public void testSimpleBean() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, 
SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedBean() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, 
SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveArray() {
-    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            PrimitiveArrayBean.class, SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, 
schema);
   }
 
   @Test
   public void testNestedArray() {
-    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            NestedArrayBean.class, SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedCollection() {
-    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            NestedCollectionBean.class, SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, 
schema);
   }
 
   @Test
   public void testPrimitiveMap() {
-    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            PrimitiveMapBean.class, SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedMap() {
-    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class);
+    Schema schema =
+        JavaBeanUtils.schemaFromJavaBeanClass(
+            NestedMapBean.class, SerializableFunctions.identity());
     SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema);
   }
 
@@ -129,7 +145,9 @@ public void testGeneratedSimpleGetters() {
     simpleBean.setBigDecimal(new BigDecimal(42));
     simpleBean.setStringBuilder(new StringBuilder("stringBuilder"));
 
-    List<FieldValueGetter> getters = 
JavaBeanUtils.getGetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA);
+    List<FieldValueGetter> getters =
+        JavaBeanUtils.getGetters(
+            SimpleBean.class, SIMPLE_BEAN_SCHEMA, 
SerializableFunctions.identity());
     assertEquals(12, getters.size());
     assertEquals("str", getters.get(0).name());
 
@@ -198,7 +216,10 @@ public void testGeneratedSimpleBoxedGetters() {
     bean.setaBoolean(true);
 
     List<FieldValueGetter> getters =
-        JavaBeanUtils.getGetters(BeanWithBoxedFields.class, 
BEAN_WITH_BOXED_FIELDS_SCHEMA);
+        JavaBeanUtils.getGetters(
+            BeanWithBoxedFields.class,
+            BEAN_WITH_BOXED_FIELDS_SCHEMA,
+            SerializableFunctions.identity());
     assertEquals((byte) 41, getters.get(0).get(bean));
     assertEquals((short) 42, getters.get(1).get(bean));
     assertEquals((int) 43, getters.get(2).get(bean));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174674)
    Time Spent: 9h 50m  (was: 9h 40m)

> Provide automatic schema registration for AVROs
> -----------------------------------------------
>
>                 Key: BEAM-4454
>                 URL: https://issues.apache.org/jira/browse/BEAM-4454
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> Need to make sure this is a compatible change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to