This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 788ce61  Merge pull request #7267: [BEAM-4454] Support Avro POJO 
objects
788ce61 is described below

commit 788ce61bd7c48bc16a8eaa93a46ac403155f4422
Author: reuvenlax <re...@google.com>
AuthorDate: Fri Dec 14 09:15:56 2018 -0800

    Merge pull request #7267: [BEAM-4454] Support Avro POJO objects
    
    * Add remaining Schema support for AVRO records:
      * Add support for SpecificRecord using ByteBuddy codegen.
      * Add helper methods for GenericRecord.
      * Fix uncovered bugs involving nullable support.
---
 ...ificRecordSchema.java => AvroRecordSchema.java} |  19 +-
 .../schemas/AvroSpecificRecordGetterFactory.java   |  30 ---
 .../AvroSpecificRecordUserTypeCreatorFactory.java  |  29 ---
 .../sdk/schemas/FieldValueTypeInformation.java     | 127 +++++++++---
 .../beam/sdk/schemas/FromRowUsingCreator.java      |   2 +-
 .../sdk/schemas/GetterBasedSchemaProvider.java     |   2 -
 .../apache/beam/sdk/schemas/JavaBeanSchema.java    |  68 ++++++-
 .../apache/beam/sdk/schemas/JavaFieldSchema.java   |  37 +++-
 .../schemas/PojoTypeUserTypeCreatorFactory.java    |  28 ---
 .../schemas/SchemaUserTypeConstructorCreator.java  |   2 +-
 .../AvroSpecificRecordTypeInformationFactory.java  |  32 ---
 .../apache/beam/sdk/schemas/utils/AvroUtils.java   | 123 +++++++----
 ...ionFactory.java => FieldValueTypeSupplier.java} |  18 +-
 .../sdk/schemas/utils/JavaBeanGetterFactory.java   |  32 ---
 .../sdk/schemas/utils/JavaBeanSetterFactory.java   |  31 ---
 .../utils/JavaBeanTypeInformationFactory.java      |  32 ---
 .../beam/sdk/schemas/utils/JavaBeanUtils.java      | 162 +++++----------
 .../apache/beam/sdk/schemas/utils/POJOUtils.java   | 101 ++++-----
 .../sdk/schemas/utils/PojoValueGetterFactory.java  |  31 ---
 .../sdk/schemas/utils/PojoValueSetterFactory.java  |  31 ---
 .../beam/sdk/schemas/utils/ReflectUtils.java       |  13 +-
 .../sdk/schemas/utils/StaticSchemaInference.java   | 104 +---------
 .../apache/beam/sdk/schemas/AvroSchemaTest.java    | 226 ++++++++++++++++++++-
 .../beam/sdk/schemas/utils/JavaBeanUtilsTest.java  |  49 ++---
 .../beam/sdk/schemas/utils/POJOUtilsTest.java      |  16 +-
 25 files changed, 652 insertions(+), 693 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
similarity index 67%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
index d8e4bda..29bf51a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
@@ -17,35 +17,34 @@
  */
 package org.apache.beam.sdk.schemas;
 
-import org.apache.avro.specific.SpecificRecord;
-import 
org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory;
 import org.apache.beam.sdk.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * A {@link SchemaProvider} for AVRO generated SpecificRecords.
+ * A {@link SchemaProvider} for AVRO generated SpecificRecords and POJOs.
  *
- * <p>This provider infers a schema from generates SpecificRecord objects, and 
creates schemas and
- * rows that bind to the appropriate fields.
+ * <p>This provider infers a schema from generated SpecificRecord objects, and 
creates schemas and
+ * rows that bind to the appropriate fields. This provider also infers schemas 
from Java POJO
+ * objects, creating a schema that matches that inferred by the AVRO libraries.
  */
-public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider {
+public class AvroRecordSchema extends GetterBasedSchemaProvider {
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return AvroUtils.getSchema((Class<? extends SpecificRecord>) 
typeDescriptor.getRawType());
+    return AvroUtils.getSchema(typeDescriptor.getRawType());
   }
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
-    return new AvroSpecificRecordGetterFactory();
+    return AvroUtils::getGetters;
   }
 
   @Override
   public UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    return new AvroSpecificRecordUserTypeCreatorFactory();
+    return AvroUtils::getCreator;
   }
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
-    return new AvroSpecificRecordTypeInformationFactory();
+    return AvroUtils::getFieldTypes;
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
deleted file mode 100644
index fcb85f4..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas;
-
-import java.util.List;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-
-/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */
-public class AvroSpecificRecordGetterFactory implements 
FieldValueGetterFactory {
-  @Override
-  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, 
schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
deleted file mode 100644
index 68d0d6a..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-
-/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */
-public class AvroSpecificRecordUserTypeCreatorFactory implements 
UserTypeCreatorFactory {
-  @Override
-  public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
-    return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, 
schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index 5cccdf6..593853f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -22,22 +22,33 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
-/** Represents type information for a schema field. */
+/** Represents type information for a Java type that will be used to infer a 
Schema type. */
 @AutoValue
 public abstract class FieldValueTypeInformation implements Serializable {
   /** Returns the field name. */
   public abstract String getName();
 
+  /** Returns whether the field is nullable. */
+  public abstract boolean isNullable();
+
   /** Returns the field type. */
-  public abstract Class getType();
+  public abstract TypeDescriptor getType();
+
+  @Nullable
+  public abstract Field getField();
+
+  @Nullable
+  public abstract Method getMethod();
 
   /** If the field is a container type, returns the element type. */
   @Nullable
@@ -51,26 +62,90 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
   @Nullable
   public abstract Type getMapValueType();
 
-  public static FieldValueTypeInformation of(Field field) {
-    return new AutoValue_FieldValueTypeInformation(
-        field.getName(),
-        field.getType(),
-        getArrayComponentType(field),
-        getMapKeyType(field),
-        getMapValueType(field));
+  abstract Builder toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    public abstract Builder setName(String name);
+
+    public abstract Builder setNullable(boolean nullable);
+
+    public abstract Builder setType(TypeDescriptor type);
+
+    public abstract Builder setField(@Nullable Field field);
+
+    public abstract Builder setMethod(@Nullable Method method);
+
+    public abstract Builder setElementType(@Nullable Type elementType);
+
+    public abstract Builder setMapKeyType(@Nullable Type mapKeyType);
+
+    public abstract Builder setMapValueType(@Nullable Type mapValueType);
+
+    abstract FieldValueTypeInformation build();
+  }
+
+  public static FieldValueTypeInformation forField(Field field) {
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(field.getName())
+        .setNullable(field.isAnnotationPresent(Nullable.class))
+        .setType(TypeDescriptor.of(field.getGenericType()))
+        .setField(field)
+        .setElementType(getArrayComponentType(field))
+        .setMapKeyType(getMapKeyType(field))
+        .setMapValueType(getMapValueType(field))
+        .build();
+  }
+
+  public static FieldValueTypeInformation forGetter(Method method) {
+    String name;
+    if (method.getName().startsWith("get")) {
+      name = ReflectUtils.stripPrefix(method.getName(), "get");
+    } else if (method.getName().startsWith("is")) {
+      name = ReflectUtils.stripPrefix(method.getName(), "is");
+    } else {
+      throw new RuntimeException("Getter has wrong prefix " + 
method.getName());
+    }
+
+    TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType());
+    boolean nullable = method.isAnnotationPresent(Nullable.class);
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(name)
+        .setNullable(nullable)
+        .setType(type)
+        .setMethod(method)
+        .setElementType(getArrayComponentType(type))
+        .setMapKeyType(getMapKeyType(type))
+        .setMapValueType(getMapValueType(type))
+        .build();
   }
 
-  public static FieldValueTypeInformation of(TypeInformation typeInformation) {
-    return new AutoValue_FieldValueTypeInformation(
-        typeInformation.getName(),
-        typeInformation.getType().getRawType(),
-        getArrayComponentType(typeInformation),
-        getMapKeyType(typeInformation),
-        getMapValueType(typeInformation));
+  public static FieldValueTypeInformation forSetter(Method method) {
+    String name;
+    if (method.getName().startsWith("set")) {
+      name = ReflectUtils.stripPrefix(method.getName(), "set");
+    } else {
+      throw new RuntimeException("Setter has wrong prefix " + 
method.getName());
+    }
+    if (method.getParameterCount() != 1) {
+      throw new RuntimeException("Setter methods should take a single 
argument.");
+    }
+    TypeDescriptor type = 
TypeDescriptor.of(method.getGenericParameterTypes()[0]);
+    boolean nullable =
+        
Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance);
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(name)
+        .setNullable(nullable)
+        .setType(type)
+        .setMethod(method)
+        .setElementType(getArrayComponentType(type))
+        .setMapKeyType(getMapKeyType(type))
+        .setMapValueType(getMapValueType(type))
+        .build();
   }
 
-  private static Type getArrayComponentType(TypeInformation typeInformation) {
-    return getArrayComponentType(typeInformation.getType());
+  public FieldValueTypeInformation withName(String name) {
+    return toBuilder().setName(name).build();
   }
 
   private static Type getArrayComponentType(Field field) {
@@ -98,15 +173,19 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
     return null;
   }
 
+  public Class getRawType() {
+    return getType().getRawType();
+  }
+
   // If the Field is a map type, returns the key type, otherwise returns a 
null reference.
   @Nullable
   private static Type getMapKeyType(Field field) {
-    return getMapType(TypeDescriptor.of(field.getGenericType()), 0);
+    return getMapKeyType(TypeDescriptor.of(field.getGenericType()));
   }
 
   @Nullable
-  private static Type getMapKeyType(TypeInformation typeInformation) {
-    return getMapType(typeInformation.getType(), 0);
+  private static Type getMapKeyType(TypeDescriptor<?> typeDescriptor) {
+    return getMapType(typeDescriptor, 0);
   }
 
   // If the Field is a map type, returns the value type, otherwise returns a 
null reference.
@@ -116,8 +195,8 @@ public abstract class FieldValueTypeInformation implements 
Serializable {
   }
 
   @Nullable
-  private static Type getMapValueType(TypeInformation typeInformation) {
-    return getMapType(typeInformation.getType(), 1);
+  private static Type getMapValueType(TypeDescriptor typeDescriptor) {
+    return getMapType(typeDescriptor, 1);
   }
 
   // If the Field is a map type, returns the key or value type (0 is key type, 
1 is value).
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 2ec1a42..139281f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -73,7 +73,7 @@ class FromRowUsingCreator<T> implements 
SerializableFunction<Row, T> {
           fromValue(
               type,
               row.getValue(i),
-              typeInformation.getType(),
+              typeInformation.getRawType(),
               typeInformation.getElementType(),
               typeInformation.getMapKeyType(),
               typeInformation.getMapValueType(),
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index f7757a8..677823f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.schemas;
 
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -38,7 +37,6 @@ public abstract class GetterBasedSchemaProvider implements 
SchemaProvider {
   abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory();
 
   /** Implementing class should override to return a constructor factory. */
-  @Nullable
   abstract UserTypeCreatorFactory schemaTypeCreatorFactory();
 
   @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 2c22400..8eb8022 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -17,13 +17,16 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.schemas.utils.JavaBeanGetterFactory;
-import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory;
-import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
 import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -41,15 +44,55 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 @Experimental(Kind.SCHEMAS)
 public class JavaBeanSchema extends GetterBasedSchemaProvider {
+  /** {@link FieldValueTypeSupplier} that's based on getter methods. */
+  @VisibleForTesting
+  public static class GetterTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types =
+          ReflectUtils.getMethods(clazz)
+              .stream()
+              .filter(ReflectUtils::isGetter)
+              .map(FieldValueTypeInformation::forGetter)
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
+  }
+
+  /** {@link FieldValueTypeSupplier} that's based on setter methods. */
+  @VisibleForTesting
+  public static class SetterTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types =
+          ReflectUtils.getMethods(clazz)
+              .stream()
+              .filter(ReflectUtils::isSetter)
+              .map(FieldValueTypeInformation::forSetter)
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
+  }
+
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return JavaBeanUtils.schemaFromJavaBeanClass(
-        typeDescriptor.getRawType(), SerializableFunctions.identity());
+    return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
   }
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
-    return new JavaBeanGetterFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        JavaBeanUtils.getGetters(targetClass, schema, new 
GetterTypeSupplier());
   }
 
   @Override
@@ -59,6 +102,15 @@ public class JavaBeanSchema extends 
GetterBasedSchemaProvider {
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
-    return new JavaBeanTypeInformationFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        JavaBeanUtils.getFieldTypes(targetClass, schema, new 
GetterTypeSupplier());
+  }
+
+  /** A factory for creating {@link FieldValueSetter} objects for a JavaBean 
object. */
+  public static class JavaBeanSetterFactory implements FieldValueSetterFactory 
{
+    @Override
+    public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
+      return JavaBeanUtils.getSetters(targetClass, schema, new 
SetterTypeSupplier());
+    }
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index a7d6a30..1504717 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
 import org.apache.beam.sdk.schemas.utils.POJOUtils;
-import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory;
-import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -39,6 +44,25 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 @Experimental(Kind.SCHEMAS)
 public class JavaFieldSchema extends GetterBasedSchemaProvider {
+  /** {@link FieldValueTypeSupplier} that's based on public fields. */
+  @VisibleForTesting
+  public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types =
+          ReflectUtils.getFields(clazz)
+              .stream()
+              .map(FieldValueTypeInformation::forField)
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
+  }
+
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
     return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType());
@@ -46,16 +70,19 @@ public class JavaFieldSchema extends 
GetterBasedSchemaProvider {
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
-    return new PojoValueGetterFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        POJOUtils.getGetters(targetClass, schema, new JavaFieldTypeSupplier());
   }
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
-    return new PojoValueTypeInformationFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        POJOUtils.getFieldTypes(targetClass, schema, new 
JavaFieldTypeSupplier());
   }
 
   @Override
   UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    return new PojoTypeUserTypeCreatorFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        POJOUtils.getCreator(targetClass, schema, new JavaFieldTypeSupplier());
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
deleted file mode 100644
index b227988..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas;
-
-import org.apache.beam.sdk.schemas.utils.POJOUtils;
-
-/** Vends constructors for POJOs. */
-class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory {
-  @Override
-  public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
-    return POJOUtils.getCreator(clazz, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
index c5fae19..6b9b18f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
@@ -27,7 +27,7 @@ public class SchemaUserTypeConstructorCreator implements 
SchemaUserTypeCreator {
   private final Class<?> clazz;
   private final transient Constructor<?> constructor;
 
-  SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor) 
{
+  public SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> 
constructor) {
     this.clazz = clazz;
     this.constructor = checkNotNull(constructor);
   }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
deleted file mode 100644
index 6b76fa6..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A {@link FieldValueTypeInformation} for AVRO-generated specific records. */
-public class AvroSpecificRecordTypeInformationFactory implements 
FieldValueTypeInformationFactory {
-  @Override
-  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return AvroUtils.getFieldTypes((Class<? extends SpecificRecord>) 
targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index df33d64..c2e737f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -23,6 +23,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.base.CaseFormat;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -42,6 +43,8 @@ import org.apache.avro.generic.GenericEnumSymbol;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecord;
@@ -56,6 +59,7 @@ import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 
@@ -117,6 +121,11 @@ public class AvroUtils {
       this.size = size;
     }
 
+    /** Create a {@link FixedBytesField} with the specified size. */
+    public static FixedBytesField withSize(int size) {
+      return new FixedBytesField(size);
+    }
+
     /** Create a {@link FixedBytesField} from a Beam {@link FieldType}. */
     @Nullable
     public static FixedBytesField fromBeamFieldType(FieldType fieldType) {
@@ -260,59 +269,100 @@ public class AvroUtils {
     return g -> toGenericRecord(g, avroSchema);
   }
 
-  /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) {
-    try {
-      org.apache.avro.Schema avroSchema =
-          (org.apache.avro.Schema) 
(clazz.getDeclaredField("SCHEMA$").get(null));
-      return toBeamSchema(avroSchema);
-    } catch (NoSuchFieldException | IllegalAccessException e) {
-      throw new IllegalArgumentException(
-          "Class "
-              + clazz
-              + " is not an AVRO SpecificRecord. "
-              + "No public SCHEMA$ field was found.");
-    }
+  /** Infer a {@link Schema} from either an AVRO-generated SpecificRecord or a 
POJO. */
+  public static <T> Schema getSchema(Class<T> clazz) {
+    return toBeamSchema(ReflectData.get().getSchema(clazz));
   }
 
-  private static final class AvroSpecificRecordFieldNamePolicy
-      implements SerializableFunction<String, String> {
-    Schema schema;
-    Map<String, String> nameMapping = Maps.newHashMap();
+  private static final class AvroSpecificRecordFieldValueTypeSupplier
+      implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, String> mapping = getMapping(schema);
+      Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
+      for (Method method : ReflectUtils.getMethods(clazz)) {
+        if (ReflectUtils.isGetter(method)) {
+          FieldValueTypeInformation fieldValueTypeInformation =
+              FieldValueTypeInformation.forGetter(method);
+          String name = mapping.get(fieldValueTypeInformation.getName());
+          if (name != null) {
+            types.put(name, fieldValueTypeInformation.withName(name));
+          }
+        }
+      }
+
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
 
-    AvroSpecificRecordFieldNamePolicy(Schema schema) {
-      this.schema = schema;
+    private Map<String, String> getMapping(Schema schema) {
+      Map<String, String> mapping = Maps.newHashMap();
       for (Field field : schema.getFields()) {
-        String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, 
field.getName());
-        nameMapping.put(getter, field.getName());
+        String underscore = 
CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName());
+        mapping.put(underscore, field.getName());
         // The Avro compiler might add a $ at the end of a getter to 
disambiguate.
-        nameMapping.put(getter + "$", field.getName());
+        mapping.put(underscore + "$", field.getName());
+        // If the field is in camel case already, then it's the identity 
mapping.
+        mapping.put(field.getName(), field.getName());
       }
+      return mapping;
     }
+  }
 
+  private static final class AvroPojoFieldValueTypeSupplier implements 
FieldValueTypeSupplier {
     @Override
-    public String apply(String input) {
-      return nameMapping.getOrDefault(input, input);
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
+      for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) {
+        if (!f.isAnnotationPresent(AvroIgnore.class)) {
+          FieldValueTypeInformation typeInformation = 
FieldValueTypeInformation.forField(f);
+          AvroName avroname = f.getAnnotation(AvroName.class);
+          if (avroname != null) {
+            typeInformation = typeInformation.withName(avroname.value());
+          }
+          types.put(typeInformation.getName(), typeInformation);
+        }
+      }
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
     }
   }
 
-  /** Get field types for an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> List<FieldValueTypeInformation> 
getFieldTypes(
-      Class<T> clazz, Schema schema) {
-    return JavaBeanUtils.getFieldTypes(
-        clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema));
+  /** Get field types for an AVRO-generated SpecificRecord or a POJO. */
+  public static <T> List<FieldValueTypeInformation> getFieldTypes(Class<T> 
clazz, Schema schema) {
+    if 
(TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) 
{
+      return JavaBeanUtils.getFieldTypes(
+          clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier());
+    } else {
+      return POJOUtils.getFieldTypes(clazz, schema, new 
AvroPojoFieldValueTypeSupplier());
+    }
   }
 
-  /** Get generated getters for an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> List<FieldValueGetter> getGetters(
-      Class<T> clazz, Schema schema) {
-    return JavaBeanUtils.getGetters(clazz, schema, new 
AvroSpecificRecordFieldNamePolicy(schema));
+  /** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */
+  public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema 
schema) {
+    if 
(TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) 
{
+      return JavaBeanUtils.getGetters(
+          clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier());
+    } else {
+      return POJOUtils.getGetters(clazz, schema, new 
AvroPojoFieldValueTypeSupplier());
+    }
   }
 
   /** Get an object creator for an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
-      Class<T> clazz, Schema schema) {
-    return AvroByteBuddyUtils.getCreator(clazz, schema);
+  public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema 
schema) {
+    if 
(TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) 
{
+      return AvroByteBuddyUtils.getCreator((Class<? extends SpecificRecord>) 
clazz, schema);
+    } else {
+      return POJOUtils.getCreator(clazz, schema, new 
AvroPojoFieldValueTypeSupplier());
+    }
   }
 
   /** Converts AVRO schema to Beam field. */
@@ -480,7 +530,6 @@ public class AvroUtils {
   private static Object genericFromBeamField(
       Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable 
Object value) {
     TypeWithNullability typeWithNullability = new 
TypeWithNullability(avroSchema);
-
     if (!fieldType.getNullable().equals(typeWithNullability.nullable)) {
       throw new IllegalArgumentException(
           "FieldType "
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
similarity index 68%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
index 84f9a5e..12330ec 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
@@ -17,15 +17,19 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
 import org.apache.beam.sdk.schemas.Schema;
 
-/** A {@link FieldValueTypeInformationFactory} for POJO objects objects. */
-public class PojoValueTypeInformationFactory implements 
FieldValueTypeInformationFactory {
-  @Override
-  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return POJOUtils.getFieldTypes(targetClass, schema);
-  }
+/**
+ * A naming policy for schema fields. This maps a name from the class (field 
name or getter name) to
+ * the matching field name in the schema.
+ */
+public interface FieldValueTypeSupplier extends Serializable {
+  /**
+   * Return all the FieldValueTypeInformations. The returned list must be in 
the same order as
+   * fields in the schema.
+   */
+  List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
deleted file mode 100644
index 5c3d2ea..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueGetter;
-import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-
-/** A factory for creating {@link FieldValueGetter} objects for a JavaBean 
object. */
-public class JavaBeanGetterFactory implements FieldValueGetterFactory {
-  @Override
-  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return JavaBeanUtils.getGetters(targetClass, schema, 
SerializableFunctions.identity());
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
deleted file mode 100644
index e67a58d..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueSetter;
-import org.apache.beam.sdk.schemas.FieldValueSetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A factory for creating {@link FieldValueSetter} objects for a JavaBean 
object. */
-public class JavaBeanSetterFactory implements FieldValueSetterFactory {
-  @Override
-  public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
-    return JavaBeanUtils.getSetters(targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
deleted file mode 100644
index f445a87..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-
-/** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */
-public class JavaBeanTypeInformationFactory implements 
FieldValueTypeInformationFactory {
-  @Override
-  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return JavaBeanUtils.getFieldTypes(targetClass, schema, 
SerializableFunctions.identity());
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 0ec07c9..58cc0ed 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.schemas.utils;
 
 import com.google.common.collect.Maps;
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -48,55 +47,46 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
-import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /** A set of utilities to generate getter and setter classes for JavaBean 
objects. */
 @Experimental(Kind.SCHEMAS)
 public class JavaBeanUtils {
   /** Create a {@link Schema} for a Java Bean class. */
-  public static Schema schemaFromJavaBeanClass(
-      Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) {
-    return StaticSchemaInference.schemaFromClass(
-        clazz, c -> JavaBeanUtils.typeInformationFromClass(c, 
fieldNamePolicy));
+  public static Schema schemaFromJavaBeanClass(Class<?> clazz) {
+    return StaticSchemaInference.schemaFromClass(clazz, 
JavaBeanUtils::typeInformationFromClass);
   }
 
-  private static List<TypeInformation> typeInformationFromClass(
-      Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) {
-    try {
-      List<TypeInformation> getterTypes =
-          ReflectUtils.getMethods(clazz)
-              .stream()
-              .filter(ReflectUtils::isGetter)
-              .map(m -> TypeInformation.forGetter(m, fieldNamePolicy))
-              .collect(Collectors.toList());
+  private static List<FieldValueTypeInformation> 
typeInformationFromClass(Class<?> clazz) {
+    List<FieldValueTypeInformation> getterTypes =
+        ReflectUtils.getMethods(clazz)
+            .stream()
+            .filter(ReflectUtils::isGetter)
+            .map(FieldValueTypeInformation::forGetter)
+            .collect(Collectors.toList());
 
-      Map<String, TypeInformation> setterTypes =
-          ReflectUtils.getMethods(clazz)
-              .stream()
-              .filter(ReflectUtils::isSetter)
-              .map(m -> TypeInformation.forSetter(m))
-              .collect(Collectors.toMap(TypeInformation::getName, 
Function.identity()));
-      validateJavaBean(getterTypes, setterTypes);
-      return getterTypes;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    Map<String, FieldValueTypeInformation> setterTypes =
+        ReflectUtils.getMethods(clazz)
+            .stream()
+            .filter(ReflectUtils::isSetter)
+            .map(FieldValueTypeInformation::forSetter)
+            .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+    validateJavaBean(getterTypes, setterTypes);
+    return getterTypes;
   }
 
   // Make sure that there are matching setters and getters.
   private static void validateJavaBean(
-      List<TypeInformation> getters, Map<String, TypeInformation> setters) {
-    for (TypeInformation type : getters) {
-      TypeInformation setterType = setters.get(type.getName());
+      List<FieldValueTypeInformation> getters, Map<String, 
FieldValueTypeInformation> setters) {
+    for (FieldValueTypeInformation type : getters) {
+      FieldValueTypeInformation setterType = setters.get(type.getName());
       if (setterType == null) {
         throw new RuntimeException(
             "JavaBean contained a getter for field "
                 + type.getName()
                 + "but did not contain a matching setter.");
       }
-      if (!type.equals(setterType)) {
+      if (!type.getType().equals(setterType.getType())) {
         throw new RuntimeException(
             "JavaBean contained setter for field "
                 + type.getName()
@@ -118,28 +108,9 @@ public class JavaBeanUtils {
       Maps.newConcurrentMap();
 
   public static List<FieldValueTypeInformation> getFieldTypes(
-      Class<?> clazz, Schema schema, SerializableFunction<String, String> 
fieldNamePolicy) {
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
-        c -> {
-          try {
-            Map<String, FieldValueTypeInformation> getterMap =
-                ReflectUtils.getMethods(clazz)
-                    .stream()
-                    .filter(ReflectUtils::isGetter)
-                    .map(m -> TypeInformation.forGetter(m, fieldNamePolicy))
-                    .map(FieldValueTypeInformation::of)
-                    .collect(
-                        Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
-            return schema
-                .getFields()
-                .stream()
-                .map(f -> getterMap.get(f.getName()))
-                .collect(Collectors.toList());
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        });
+        new ClassWithSchema(clazz, schema), c -> 
fieldValueTypeSupplier.get(clazz, schema));
   }
 
   // The list of getters for a class is cached, so we only create the classes 
the first time
@@ -153,37 +124,22 @@ public class JavaBeanUtils {
    * <p>The returned list is ordered by the order of fields in the schema.
    */
   public static List<FieldValueGetter> getGetters(
-      Class<?> clazz, Schema schema, SerializableFunction<String, String> 
fieldNamePolicy) {
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_GETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          try {
-            Map<String, FieldValueGetter> getterMap =
-                ReflectUtils.getMethods(clazz)
-                    .stream()
-                    .filter(ReflectUtils::isGetter)
-                    .map(m -> JavaBeanUtils.createGetter(m, fieldNamePolicy))
-                    .collect(Collectors.toMap(FieldValueGetter::name, 
Function.identity()));
-            return schema
-                .getFields()
-                .stream()
-                .map(f -> getterMap.get(f.getName()))
-                .collect(Collectors.toList());
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return 
types.stream().map(JavaBeanUtils::createGetter).collect(Collectors.toList());
         });
   }
 
-  private static <T> FieldValueGetter createGetter(
-      Method getterMethod, SerializableFunction<String, String> 
fieldNamePolicy) {
-    TypeInformation typeInformation = TypeInformation.forGetter(getterMethod, 
fieldNamePolicy);
+  private static <T> FieldValueGetter createGetter(FieldValueTypeInformation 
typeInformation) {
     DynamicType.Builder<FieldValueGetter> builder =
         ByteBuddyUtils.subclassGetterInterface(
             BYTE_BUDDY,
-            getterMethod.getDeclaringClass(),
+            typeInformation.getMethod().getDeclaringClass(),
             new ConvertType(false).convert(typeInformation.getType()));
-    builder = implementGetterMethods(builder, getterMethod, fieldNamePolicy);
+    builder = implementGetterMethods(builder, typeInformation);
     try {
       return builder
           .make()
@@ -195,20 +151,18 @@ public class JavaBeanUtils {
         | IllegalAccessException
         | NoSuchMethodException
         | InvocationTargetException e) {
-      throw new RuntimeException("Unable to generate a getter for getter '" + 
getterMethod + "'");
+      throw new RuntimeException(
+          "Unable to generate a getter for getter '" + 
typeInformation.getMethod() + "'");
     }
   }
 
   private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
-      DynamicType.Builder<FieldValueGetter> builder,
-      Method method,
-      SerializableFunction<String, String> fieldNamePolicy) {
-    TypeInformation typeInformation = TypeInformation.forGetter(method, 
fieldNamePolicy);
+      DynamicType.Builder<FieldValueGetter> builder, FieldValueTypeInformation 
typeInformation) {
     return builder
         .method(ElementMatchers.named("name"))
         .intercept(FixedValue.reference(typeInformation.getName()))
         .method(ElementMatchers.named("get"))
-        .intercept(new InvokeGetterInstruction(method, fieldNamePolicy));
+        .intercept(new InvokeGetterInstruction(typeInformation));
   }
 
   // The list of setters for a class is cached, so we only create the classes 
the first time
@@ -221,36 +175,23 @@ public class JavaBeanUtils {
    *
    * <p>The returned list is ordered by the order of fields in the schema.
    */
-  public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueSetter> getSetters(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_SETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          try {
-            Map<String, FieldValueSetter> setterMap =
-                ReflectUtils.getMethods(clazz)
-                    .stream()
-                    .filter(ReflectUtils::isSetter)
-                    .map(JavaBeanUtils::createSetter)
-                    .collect(Collectors.toMap(FieldValueSetter::name, 
Function.identity()));
-            return schema
-                .getFields()
-                .stream()
-                .map(f -> setterMap.get(f.getName()))
-                .collect(Collectors.toList());
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return 
types.stream().map(JavaBeanUtils::createSetter).collect(Collectors.toList());
         });
   }
 
-  private static <T> FieldValueSetter createSetter(Method setterMethod) {
-    TypeInformation typeInformation = TypeInformation.forSetter(setterMethod);
+  private static FieldValueSetter createSetter(FieldValueTypeInformation 
typeInformation) {
     DynamicType.Builder<FieldValueSetter> builder =
         ByteBuddyUtils.subclassSetterInterface(
             BYTE_BUDDY,
-            setterMethod.getDeclaringClass(),
+            typeInformation.getMethod().getDeclaringClass(),
             new ConvertType(false).convert(typeInformation.getType()));
-    builder = implementSetterMethods(builder, setterMethod);
+    builder = implementSetterMethods(builder, typeInformation.getMethod());
     try {
       return builder
           .make()
@@ -262,29 +203,27 @@ public class JavaBeanUtils {
         | IllegalAccessException
         | NoSuchMethodException
         | InvocationTargetException e) {
-      throw new RuntimeException("Unable to generate a setter for setter '" + 
setterMethod + "'");
+      throw new RuntimeException(
+          "Unable to generate a setter for setter '" + 
typeInformation.getMethod() + "'");
     }
   }
 
   private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
       DynamicType.Builder<FieldValueSetter> builder, Method method) {
-    TypeInformation typeInformation = TypeInformation.forSetter(method);
+    FieldValueTypeInformation javaTypeInformation = 
FieldValueTypeInformation.forSetter(method);
     return builder
         .method(ElementMatchers.named("name"))
-        .intercept(FixedValue.reference(typeInformation.getName()))
+        .intercept(FixedValue.reference(javaTypeInformation.getName()))
         .method(ElementMatchers.named("set"))
         .intercept(new InvokeSetterInstruction(method));
   }
 
   // Implements a method to read a public getter out of an object.
   private static class InvokeGetterInstruction implements Implementation {
-    // Getter method that wil be invoked
-    private Method method;
-    private SerializableFunction<String, String> fieldNamePolicy;
+    private final FieldValueTypeInformation typeInformation;
 
-    InvokeGetterInstruction(Method method, SerializableFunction<String, 
String> fieldNamePolicy) {
-      this.method = method;
-      this.fieldNamePolicy = fieldNamePolicy;
+    InvokeGetterInstruction(FieldValueTypeInformation typeInformation) {
+      this.typeInformation = typeInformation;
     }
 
     @Override
@@ -295,7 +234,6 @@ public class JavaBeanUtils {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return (methodVisitor, implementationContext, instrumentedMethod) -> {
-        TypeInformation typeInformation = TypeInformation.forGetter(method, 
fieldNamePolicy);
         // this + method parameters.
         int numLocals = 1 + instrumentedMethod.getParameters().size();
 
@@ -305,7 +243,7 @@ public class JavaBeanUtils {
                 // Method param is offset 1 (offset 0 is the this parameter).
                 MethodVariableAccess.REFERENCE.loadFrom(1),
                 // Invoke the getter
-                MethodInvocation.invoke(new ForLoadedMethod(method)));
+                MethodInvocation.invoke(new 
ForLoadedMethod(typeInformation.getMethod())));
 
         StackManipulation stackManipulation =
             new StackManipulation.Compound(
@@ -335,7 +273,7 @@ public class JavaBeanUtils {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return (methodVisitor, implementationContext, instrumentedMethod) -> {
-        TypeInformation typeInformation = TypeInformation.forSetter(method);
+        FieldValueTypeInformation javaTypeInformation = 
FieldValueTypeInformation.forSetter(method);
         // this + method parameters.
         int numLocals = 1 + instrumentedMethod.getParameters().size();
 
@@ -349,7 +287,7 @@ public class JavaBeanUtils {
                 MethodVariableAccess.REFERENCE.loadFrom(1),
                 // Do any conversions necessary.
                 new ByteBuddyUtils.ConvertValueForSetter(readField)
-                    .convert(typeInformation.getType()),
+                    .convert(javaTypeInformation.getType()),
                 // Now update the field and return void.
                 MethodInvocation.invoke(new ForLoadedMethod(method)),
                 MethodReturn.VOID);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index 38f5307..370b3cc 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -55,7 +56,6 @@ import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
-import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -63,12 +63,11 @@ import org.apache.beam.sdk.values.TypeDescriptor;
 @Experimental(Kind.SCHEMAS)
 public class POJOUtils {
   public static Schema schemaFromPojoClass(Class<?> clazz) {
-    // We should cache the field order.
-    Function<Class, List<TypeInformation>> getTypesForClass =
+    Function<Class, List<FieldValueTypeInformation>> getTypesForClass =
         c ->
             ReflectUtils.getFields(c)
                 .stream()
-                .map(TypeInformation::forField)
+                .map(FieldValueTypeInformation::forField)
                 .collect(Collectors.toList());
     return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass);
   }
@@ -79,22 +78,10 @@ public class POJOUtils {
   private static final Map<ClassWithSchema, List<FieldValueTypeInformation>> 
CACHED_FIELD_TYPES =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz, 
Schema schema) {
+  public static List<FieldValueTypeInformation> getFieldTypes(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
-        c -> {
-          Map<String, FieldValueTypeInformation> typeInformationMap =
-              ReflectUtils.getFields(clazz)
-                  .stream()
-                  .map(FieldValueTypeInformation::of)
-                  .collect(
-                      Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
-          return schema
-              .getFields()
-              .stream()
-              .map(f -> typeInformationMap.get(f.getName()))
-              .collect(Collectors.toList());
-        });
+        new ClassWithSchema(clazz, schema), c -> 
fieldValueTypeSupplier.get(clazz, schema));
   }
 
   // The list of getters for a class is cached, so we only create the classes 
the first time
@@ -102,21 +89,20 @@ public class POJOUtils {
   private static final Map<ClassWithSchema, List<FieldValueGetter>> 
CACHED_GETTERS =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueGetter> getGetters(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     // Return the getters ordered by their position in the schema.
     return CACHED_GETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          Map<String, FieldValueGetter> getterMap =
-              ReflectUtils.getFields(clazz)
-                  .stream()
-                  .map(POJOUtils::createGetter)
-                  .collect(Collectors.toMap(FieldValueGetter::name, 
Function.identity()));
-          return schema
-              .getFields()
-              .stream()
-              .map(f -> getterMap.get(f.getName()))
-              .collect(Collectors.toList());
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          List<FieldValueGetter> getters =
+              
types.stream().map(POJOUtils::createGetter).collect(Collectors.toList());
+          if (getters.size() != schema.getFieldCount()) {
+            throw new RuntimeException(
+                "Was not able to generate getters for schema: " + schema + " 
class: " + clazz);
+          }
+          return getters;
         });
   }
 
@@ -125,24 +111,21 @@ public class POJOUtils {
   public static final Map<ClassWithSchema, SchemaUserTypeCreator> 
CACHED_CREATORS =
       Maps.newConcurrentMap();
 
-  public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema 
schema) {
+  public static <T> SchemaUserTypeCreator getCreator(
+      Class<T> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return createCreator(clazz, schema, types);
+        });
   }
 
-  private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, 
Schema schema) {
+  private static <T> SchemaUserTypeCreator createCreator(
+      Class<T> clazz, Schema schema, List<FieldValueTypeInformation> types) {
     // Get the list of class fields ordered by schema.
-    Map<String, Field> fieldMap =
-        ReflectUtils.getFields(clazz)
-            .stream()
-            .collect(Collectors.toMap(Field::getName, Function.identity()));
     List<Field> fields =
-        schema
-            .getFields()
-            .stream()
-            .map(f -> fieldMap.get(f.getName()))
-            .collect(Collectors.toList());
-
+        
types.stream().map(FieldValueTypeInformation::getField).collect(Collectors.toList());
     try {
       DynamicType.Builder<SchemaUserTypeCreator> builder =
           BYTE_BUDDY
@@ -179,13 +162,16 @@ public class POJOUtils {
    * </code></pre>
    */
   @SuppressWarnings("unchecked")
-  static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> 
createGetter(Field field) {
+  @Nullable
+  static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter(
+      FieldValueTypeInformation typeInformation) {
+    Field field = typeInformation.getField();
     DynamicType.Builder<FieldValueGetter> builder =
         ByteBuddyUtils.subclassGetterInterface(
             BYTE_BUDDY,
             field.getDeclaringClass(),
             new 
ConvertType(false).convert(TypeDescriptor.of(field.getType())));
-    builder = implementGetterMethods(builder, field);
+    builder = implementGetterMethods(builder, field, 
typeInformation.getName());
     try {
       return builder
           .make()
@@ -202,10 +188,10 @@ public class POJOUtils {
   }
 
   private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
-      DynamicType.Builder<FieldValueGetter> builder, Field field) {
+      DynamicType.Builder<FieldValueGetter> builder, Field field, String name) 
{
     return builder
         .method(ElementMatchers.named("name"))
-        .intercept(FixedValue.reference(field.getName()))
+        .intercept(FixedValue.reference(name))
         .method(ElementMatchers.named("get"))
         .intercept(new ReadFieldInstruction(field));
   }
@@ -215,21 +201,14 @@ public class POJOUtils {
   private static final Map<ClassWithSchema, List<FieldValueSetter>> 
CACHED_SETTERS =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueSetter> getSetters(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     // Return the setters, ordered by their position in the schema.
     return CACHED_SETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          Map<String, FieldValueSetter> setterMap =
-              ReflectUtils.getFields(clazz)
-                  .stream()
-                  .map(POJOUtils::createSetter)
-                  .collect(Collectors.toMap(FieldValueSetter::name, 
Function.identity()));
-          return schema
-              .getFields()
-              .stream()
-              .map(f -> setterMap.get(f.getName()))
-              .collect(Collectors.toList());
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return 
types.stream().map(POJOUtils::createSetter).collect(Collectors.toList());
         });
   }
 
@@ -250,7 +229,9 @@ public class POJOUtils {
    * </code></pre>
    */
   @SuppressWarnings("unchecked")
-  private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> 
createSetter(Field field) {
+  private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> 
createSetter(
+      FieldValueTypeInformation typeInformation) {
+    Field field = typeInformation.getField();
     DynamicType.Builder<FieldValueSetter> builder =
         ByteBuddyUtils.subclassSetterInterface(
             BYTE_BUDDY,
@@ -284,7 +265,7 @@ public class POJOUtils {
   // Implements a method to read a public field out of an object.
   static class ReadFieldInstruction implements Implementation {
     // Field that will be read.
-    private Field field;
+    private final Field field;
 
     ReadFieldInstruction(Field field) {
       this.field = field;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
deleted file mode 100644
index 275b791..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueGetter;
-import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A factory for creating {@link FieldValueGetter} objects for a POJO. */
-public class PojoValueGetterFactory implements FieldValueGetterFactory {
-  @Override
-  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return POJOUtils.getGetters(targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
deleted file mode 100644
index 5e9447a..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueSetter;
-import org.apache.beam.sdk.schemas.FieldValueSetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A factory for creating {@link FieldValueSetter} objects for a POJO. */
-public class PojoValueSetterFactory implements FieldValueSetterFactory {
-  @Override
-  public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
-    return POJOUtils.getSetters(targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
index 8619ea5..c9d943b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -21,7 +21,6 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
@@ -40,7 +39,7 @@ public class ReflectUtils {
     private final Class clazz;
     private final Schema schema;
 
-    public ClassWithSchema(Class clazz, Schema schema) {
+    ClassWithSchema(Class clazz, Schema schema) {
       this.clazz = clazz;
       this.schema = schema;
     }
@@ -67,7 +66,7 @@ public class ReflectUtils {
   private static final Map<Class, List<Field>> DECLARED_FIELDS = 
Maps.newHashMap();
 
   /** Returns the list of public, non-static methods in the class, caching the 
results. */
-  static List<Method> getMethods(Class clazz) throws IOException {
+  public static List<Method> getMethods(Class clazz) {
     return DECLARED_METHODS.computeIfAbsent(
         clazz,
         c -> {
@@ -79,7 +78,7 @@ public class ReflectUtils {
   }
 
   // Get all public, non-static, non-transient fields.
-  static List<Field> getFields(Class<?> clazz) {
+  public static List<Field> getFields(Class<?> clazz) {
     return DECLARED_FIELDS.computeIfAbsent(
         clazz,
         c -> {
@@ -104,7 +103,7 @@ public class ReflectUtils {
         });
   }
 
-  static boolean isGetter(Method method) {
+  public static boolean isGetter(Method method) {
     if (Void.TYPE.equals(method.getReturnType())) {
       return false;
     }
@@ -118,13 +117,13 @@ public class ReflectUtils {
             || Boolean.class.equals(method.getReturnType())));
   }
 
-  static boolean isSetter(Method method) {
+  public static boolean isSetter(Method method) {
     return Void.TYPE.equals(method.getReturnType())
         && method.getParameterCount() == 1
         && method.getName().startsWith("set");
   }
 
-  static String stripPrefix(String methodName, String prefix) {
+  public static String stripPrefix(String methodName, String prefix) {
     String firstLetter = methodName.substring(prefix.length(), prefix.length() 
+ 1).toLowerCase();
 
     return (methodName.length() == prefix.length() + 1)
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index fdf978c..d3f81e8 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -20,21 +20,16 @@ package org.apache.beam.sdk.schemas.utils;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.function.Function;
-import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.ReadableInstant;
 
@@ -64,94 +59,6 @@ public class StaticSchemaInference {
           .put(BigDecimal.class, FieldType.DECIMAL)
           .build();
 
-  /** Relevant information about a Java type. */
-  public static class TypeInformation {
-    private final String name;
-    private final TypeDescriptor type;
-    private final boolean nullable;
-
-    /** Construct a {@link TypeInformation}. */
-    private TypeInformation(String name, TypeDescriptor type, boolean 
nullable) {
-      this.name = name;
-      this.type = type;
-      this.nullable = nullable;
-    }
-
-    /** Construct a {@link TypeInformation} from a class member variable. */
-    public static TypeInformation forField(Field field) {
-      return new TypeInformation(
-          field.getName(),
-          TypeDescriptor.of(field.getGenericType()),
-          field.isAnnotationPresent(Nullable.class));
-    }
-
-    /** Construct a {@link TypeInformation} from a class getter. */
-    public static TypeInformation forGetter(
-        Method method, SerializableFunction<String, String> fieldNamePolicy) {
-      String name;
-      if (method.getName().startsWith("get")) {
-        name = ReflectUtils.stripPrefix(method.getName(), "get");
-      } else if (method.getName().startsWith("is")) {
-        name = ReflectUtils.stripPrefix(method.getName(), "is");
-      } else {
-        throw new RuntimeException("Getter has wrong prefix " + 
method.getName());
-      }
-      name = fieldNamePolicy.apply(name);
-
-      TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType());
-      boolean nullable = method.isAnnotationPresent(Nullable.class);
-      return new TypeInformation(name, type, nullable);
-    }
-
-    /** Construct a {@link TypeInformation} from a class setter. */
-    public static TypeInformation forSetter(Method method) {
-      String name;
-      if (method.getName().startsWith("set")) {
-        name = ReflectUtils.stripPrefix(method.getName(), "set");
-      } else {
-        throw new RuntimeException("Setter has wrong prefix " + 
method.getName());
-      }
-      if (method.getParameterCount() != 1) {
-        throw new RuntimeException("Setter methods should take a single 
argument.");
-      }
-      TypeDescriptor type = 
TypeDescriptor.of(method.getGenericParameterTypes()[0]);
-      boolean nullable =
-          
Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance);
-      return new TypeInformation(name, type, nullable);
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public TypeDescriptor getType() {
-      return type;
-    }
-
-    public boolean isNullable() {
-      return nullable;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      TypeInformation that = (TypeInformation) o;
-      return nullable == that.nullable
-          && Objects.equals(name, that.name)
-          && Objects.equals(type, that.type);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(name, type, nullable);
-    }
-  }
-
   /**
    * Infer a schema from a Java class.
    *
@@ -160,9 +67,9 @@ public class StaticSchemaInference {
    * public getter methods, or special annotations on the class.
    */
   public static Schema schemaFromClass(
-      Class<?> clazz, Function<Class, List<TypeInformation>> getTypesForClass) 
{
+      Class<?> clazz, Function<Class, List<FieldValueTypeInformation>> 
getTypesForClass) {
     Schema.Builder builder = Schema.builder();
-    for (TypeInformation type : getTypesForClass.apply(clazz)) {
+    for (FieldValueTypeInformation type : getTypesForClass.apply(clazz)) {
       Schema.FieldType fieldType = fieldFromType(type.getType(), 
getTypesForClass);
       if (type.isNullable()) {
         builder.addNullableField(type.getName(), fieldType);
@@ -175,7 +82,7 @@ public class StaticSchemaInference {
 
   // Map a Java field type to a Beam Schema FieldType.
   private static Schema.FieldType fieldFromType(
-      TypeDescriptor type, Function<Class, List<TypeInformation>> 
getTypesForClass) {
+      TypeDescriptor type, Function<Class, List<FieldValueTypeInformation>> 
getTypesForClass) {
     FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType());
     if (primitiveType != null) {
       return primitiveType;
@@ -209,7 +116,8 @@ public class StaticSchemaInference {
         FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), 
getTypesForClass);
         FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), 
getTypesForClass);
         checkArgument(
-            keyType.getTypeName().isPrimitiveType(), "Only primitive types can 
be map keys");
+            keyType.getTypeName().isPrimitiveType(),
+            "Only primitive types can be map keys. type: " + 
keyType.getTypeName());
         return FieldType.map(keyType, valueType);
       } else {
         throw new RuntimeException("Cannot infer schema from unparameterized 
map.");
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
index f7cc64d..3c22f4b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
@@ -22,12 +22,20 @@ import static org.junit.Assert.assertEquals;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.AvroSchema;
 import org.apache.avro.util.Utf8;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.FixedBytesField;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -36,6 +44,142 @@ import org.junit.Test;
 
 /** Tests for AVRO schema classes. */
 public class AvroSchemaTest {
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroSubPojo {
+    @AvroName("bool_non_nullable")
+    public boolean boolNonNullable;
+
+    @AvroName("int")
+    @org.apache.avro.reflect.Nullable
+    public Integer anInt;
+
+    public AvroSubPojo(boolean boolNonNullable, Integer anInt) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+    }
+
+    public AvroSubPojo() {}
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroSubPojo)) {
+        return false;
+      }
+      AvroSubPojo that = (AvroSubPojo) o;
+      return boolNonNullable == that.boolNonNullable && Objects.equals(anInt, 
that.anInt);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(boolNonNullable, anInt);
+    }
+
+    @Override
+    public String toString() {
+      return "AvroSubPojo{" + "boolNonNullable=" + boolNonNullable + ", 
anInt=" + anInt + '}';
+    }
+  }
+
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroPojo {
+    public @AvroName("bool_non_nullable") boolean boolNonNullable;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("int") Integer anInt;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("long") Long aLong;
+
+    @AvroName("float")
+    @org.apache.avro.reflect.Nullable
+    public Float aFloat;
+
+    @AvroName("double")
+    @org.apache.avro.reflect.Nullable
+    public Double aDouble;
+
+    @org.apache.avro.reflect.Nullable public String string;
+    @org.apache.avro.reflect.Nullable public ByteBuffer bytes;
+
+    @AvroSchema("{\"type\": \"fixed\", \"size\": 4, \"name\": \"fixed4\"}")
+    @org.apache.avro.reflect.Nullable
+    public byte[] fixed;
+
+    @org.apache.avro.reflect.Nullable public AvroSubPojo row;
+    @org.apache.avro.reflect.Nullable public List<AvroSubPojo> array;
+    @org.apache.avro.reflect.Nullable public Map<String, AvroSubPojo> map;
+    @AvroIgnore String extraField;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroPojo)) {
+        return false;
+      }
+      AvroPojo avroPojo = (AvroPojo) o;
+      return boolNonNullable == avroPojo.boolNonNullable
+          && Objects.equals(anInt, avroPojo.anInt)
+          && Objects.equals(aLong, avroPojo.aLong)
+          && Objects.equals(aFloat, avroPojo.aFloat)
+          && Objects.equals(aDouble, avroPojo.aDouble)
+          && Objects.equals(string, avroPojo.string)
+          && Objects.equals(bytes, avroPojo.bytes)
+          && Arrays.equals(fixed, avroPojo.fixed)
+          && Objects.equals(row, avroPojo.row)
+          && Objects.equals(array, avroPojo.array)
+          && Objects.equals(map, avroPojo.map);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          boolNonNullable,
+          anInt,
+          aLong,
+          aFloat,
+          aDouble,
+          string,
+          bytes,
+          Arrays.hashCode(fixed),
+          row,
+          array,
+          map);
+    }
+
+    public AvroPojo(
+        boolean boolNonNullable,
+        int anInt,
+        long aLong,
+        float aFloat,
+        double aDouble,
+        String string,
+        ByteBuffer bytes,
+        byte[] fixed,
+        AvroSubPojo row,
+        List<AvroSubPojo> array,
+        Map<String, AvroSubPojo> map) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aFloat = aFloat;
+      this.aDouble = aDouble;
+      this.string = string;
+      this.bytes = bytes;
+      this.fixed = fixed;
+      this.row = row;
+      this.array = array;
+      this.map = map;
+      this.extraField = "";
+    }
+
+    public AvroPojo() {}
+  }
+
   private static final Schema SUBSCHEMA =
       Schema.builder()
           .addField("bool_non_nullable", FieldType.BOOLEAN)
@@ -52,18 +196,27 @@ public class AvroSchemaTest {
           .addNullableField("double", FieldType.DOUBLE)
           .addNullableField("string", FieldType.STRING)
           .addNullableField("bytes", FieldType.BYTES)
-          .addField("fixed", FieldType.BYTES.withMetadata("FIXED:4"))
+          .addField("fixed", FixedBytesField.withSize(4).toBeamType())
           .addNullableField("timestampMillis", FieldType.DATETIME)
           .addNullableField("row", SUB_TYPE)
           .addNullableField("array", FieldType.array(SUB_TYPE))
           .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE))
           .build();
 
-  @Test
-  public void testSpecificRecordSchema() {
-    assertEquals(
-        SCHEMA, new 
AvroSpecificRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class)));
-  }
+  private static final Schema POJO_SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FixedBytesField.withSize(4).toBeamType())
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", 
FieldType.array(SUB_TYPE.withNullable(false)))
+          .addNullableField("map", FieldType.map(FieldType.STRING, 
SUB_TYPE.withNullable(false)))
+          .build();
 
   private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
   private static final DateTime DATE_TIME =
@@ -131,17 +284,26 @@ public class AvroSchemaTest {
           .build();
 
   @Test
+  public void testSpecificRecordSchema() {
+    assertEquals(SCHEMA, new 
AvroRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class)));
+  }
+
+  @Test
+  public void testPojoSchema() {
+    assertEquals(POJO_SCHEMA, AvroUtils.getSchema(AvroPojo.class));
+  }
+
+  @Test
   public void testSpecificRecordToRow() {
     SerializableFunction<TestAvro, Row> toRow =
-        new 
AvroSpecificRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class));
-    Row row = toRow.apply(AVRO_SPECIFIC_RECORD);
+        new 
AvroRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class));
     assertEquals(ROW, toRow.apply(AVRO_SPECIFIC_RECORD));
   }
 
   @Test
   public void testRowToSpecificRecord() {
     SerializableFunction<Row, TestAvro> fromRow =
-        new 
AvroSpecificRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class));
+        new 
AvroRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class));
     assertEquals(AVRO_SPECIFIC_RECORD, fromRow.apply(ROW));
   }
 
@@ -156,7 +318,51 @@ public class AvroSchemaTest {
   public void testRowToGenericRecord() {
     SerializableFunction<Row, GenericRecord> fromRow =
         AvroUtils.getRowToGenericRecordFunction(TestAvro.SCHEMA$);
-    GenericRecord generic = fromRow.apply(ROW);
     assertEquals(AVRO_GENERIC_RECORD, fromRow.apply(ROW));
   }
+
+  private static final AvroSubPojo SUB_POJO = new AvroSubPojo(true, 42);
+  private static final AvroPojo AVRO_POJO =
+      new AvroPojo(
+          true,
+          43,
+          44L,
+          (float) 44.1,
+          (double) 44.2,
+          "mystring",
+          ByteBuffer.wrap(BYTE_ARRAY),
+          BYTE_ARRAY,
+          SUB_POJO,
+          ImmutableList.of(SUB_POJO, SUB_POJO),
+          ImmutableMap.of("k1", SUB_POJO, "k2", SUB_POJO));
+
+  private static final Row ROW_FOR_POJO =
+      Row.withSchema(POJO_SCHEMA)
+          .addValues(
+              true,
+              43,
+              44L,
+              (float) 44.1,
+              (double) 44.2,
+              "mystring",
+              ByteBuffer.wrap(BYTE_ARRAY),
+              ByteBuffer.wrap(BYTE_ARRAY),
+              NESTED_ROW,
+              ImmutableList.of(NESTED_ROW, NESTED_ROW),
+              ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW))
+          .build();
+
+  @Test
+  public void testPojoRecordToRow() {
+    SerializableFunction<AvroPojo, Row> toRow =
+        new 
AvroRecordSchema().toRowFunction(TypeDescriptor.of(AvroPojo.class));
+    assertEquals(ROW_FOR_POJO, toRow.apply(AVRO_POJO));
+  }
+
+  @Test
+  public void testRowToPojo() {
+    SerializableFunction<Row, AvroPojo> fromRow =
+        new 
AvroRecordSchema().fromRowFunction(TypeDescriptor.of(AvroPojo.class));
+    assertEquals(AVRO_POJO, fromRow.apply(ROW_FOR_POJO));
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index 9380899..c835ea6 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -37,6 +37,8 @@ import java.nio.charset.Charset;
 import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.JavaBeanSchema.SetterTypeSupplier;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray;
@@ -49,7 +51,6 @@ import 
org.apache.beam.sdk.schemas.utils.TestJavaBeans.NullableBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.joda.time.DateTime;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,8 +62,7 @@ public class JavaBeanUtilsTest {
 
   @Test
   public void testNullable() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, 
SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class);
     assertTrue(schema.getField("str").getType().getNullable());
     assertFalse(schema.getField("anInt").getType().getNullable());
   }
@@ -70,62 +70,48 @@ public class JavaBeanUtilsTest {
   @Test
   public void testMismatchingNullable() {
     thrown.expect(RuntimeException.class);
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            MismatchingNullableBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class);
   }
 
   @Test
   public void testSimpleBean() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, 
SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class);
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedBean() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, 
SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveArray() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            PrimitiveArrayBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, 
schema);
   }
 
   @Test
   public void testNestedArray() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            NestedArrayBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedCollection() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            NestedCollectionBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, 
schema);
   }
 
   @Test
   public void testPrimitiveMap() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            PrimitiveMapBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedMap() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            NestedMapBean.class, SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema);
   }
 
@@ -147,7 +133,7 @@ public class JavaBeanUtilsTest {
 
     List<FieldValueGetter> getters =
         JavaBeanUtils.getGetters(
-            SimpleBean.class, SIMPLE_BEAN_SCHEMA, 
SerializableFunctions.identity());
+            SimpleBean.class, SIMPLE_BEAN_SCHEMA, new 
JavaBeanSchema.GetterTypeSupplier());
     assertEquals(12, getters.size());
     assertEquals("str", getters.get(0).name());
 
@@ -174,7 +160,8 @@ public class JavaBeanUtilsTest {
   @Test
   public void testGeneratedSimpleSetters() {
     SimpleBean simpleBean = new SimpleBean();
-    List<FieldValueSetter> setters = 
JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA);
+    List<FieldValueSetter> setters =
+        JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA, new 
SetterTypeSupplier());
     assertEquals(12, setters.size());
 
     setters.get(0).set(simpleBean, "field1");
@@ -219,7 +206,7 @@ public class JavaBeanUtilsTest {
         JavaBeanUtils.getGetters(
             BeanWithBoxedFields.class,
             BEAN_WITH_BOXED_FIELDS_SCHEMA,
-            SerializableFunctions.identity());
+            new JavaBeanSchema.GetterTypeSupplier());
     assertEquals((byte) 41, getters.get(0).get(bean));
     assertEquals((short) 42, getters.get(1).get(bean));
     assertEquals((int) 43, getters.get(2).get(bean));
@@ -231,7 +218,8 @@ public class JavaBeanUtilsTest {
   public void testGeneratedSimpleBoxedSetters() {
     BeanWithBoxedFields bean = new BeanWithBoxedFields();
     List<FieldValueSetter> setters =
-        JavaBeanUtils.getSetters(BeanWithBoxedFields.class, 
BEAN_WITH_BOXED_FIELDS_SCHEMA);
+        JavaBeanUtils.getSetters(
+            BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA, new 
SetterTypeSupplier());
 
     setters.get(0).set(bean, (byte) 41);
     setters.get(1).set(bean, (short) 42);
@@ -250,7 +238,8 @@ public class JavaBeanUtilsTest {
   public void testGeneratedByteBufferSetters() {
     BeanWithByteArray bean = new BeanWithByteArray();
     List<FieldValueSetter> setters =
-        JavaBeanUtils.getSetters(BeanWithByteArray.class, 
BEAN_WITH_BYTE_ARRAY_SCHEMA);
+        JavaBeanUtils.getSetters(
+            BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA, new 
SetterTypeSupplier());
     setters.get(0).set(bean, "field1".getBytes(Charset.defaultCharset()));
     setters.get(1).set(bean, "field2".getBytes(Charset.defaultCharset()));
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
index 1a0946d..e140a8b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -37,6 +37,7 @@ import java.nio.charset.Charset;
 import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO;
@@ -126,7 +127,8 @@ public class POJOUtilsTest {
             new BigDecimal(42),
             new StringBuilder("stringBuilder"));
 
-    List<FieldValueGetter> getters = POJOUtils.getGetters(SimplePOJO.class, 
SIMPLE_POJO_SCHEMA);
+    List<FieldValueGetter> getters =
+        POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new 
JavaFieldTypeSupplier());
     assertEquals(12, getters.size());
     assertEquals("str", getters.get(0).name());
     assertEquals("field1", getters.get(0).get(simplePojo));
@@ -147,7 +149,8 @@ public class POJOUtilsTest {
   @Test
   public void testGeneratedSimpleSetters() {
     SimplePOJO simplePojo = new SimplePOJO();
-    List<FieldValueSetter> setters = POJOUtils.getSetters(SimplePOJO.class, 
SIMPLE_POJO_SCHEMA);
+    List<FieldValueSetter> setters =
+        POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new 
JavaFieldTypeSupplier());
     assertEquals(12, setters.size());
 
     setters.get(0).set(simplePojo, "field1");
@@ -182,7 +185,8 @@ public class POJOUtilsTest {
     POJOWithBoxedFields pojo = new POJOWithBoxedFields((byte) 41, (short) 42, 
43, 44L, true);
 
     List<FieldValueGetter> getters =
-        POJOUtils.getGetters(POJOWithBoxedFields.class, 
POJO_WITH_BOXED_FIELDS_SCHEMA);
+        POJOUtils.getGetters(
+            POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new 
JavaFieldTypeSupplier());
     assertEquals((byte) 41, getters.get(0).get(pojo));
     assertEquals((short) 42, getters.get(1).get(pojo));
     assertEquals((int) 43, getters.get(2).get(pojo));
@@ -194,7 +198,8 @@ public class POJOUtilsTest {
   public void testGeneratedSimpleBoxedSetters() {
     POJOWithBoxedFields pojo = new POJOWithBoxedFields();
     List<FieldValueSetter> setters =
-        POJOUtils.getSetters(POJOWithBoxedFields.class, 
POJO_WITH_BOXED_FIELDS_SCHEMA);
+        POJOUtils.getSetters(
+            POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new 
JavaFieldTypeSupplier());
 
     setters.get(0).set(pojo, (byte) 41);
     setters.get(1).set(pojo, (short) 42);
@@ -213,7 +218,8 @@ public class POJOUtilsTest {
   public void testGeneratedByteBufferSetters() {
     POJOWithByteArray pojo = new POJOWithByteArray();
     List<FieldValueSetter> setters =
-        POJOUtils.getSetters(POJOWithByteArray.class, 
POJO_WITH_BYTE_ARRAY_SCHEMA);
+        POJOUtils.getSetters(
+            POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, new 
JavaFieldTypeSupplier());
     setters.get(0).set(pojo, BYTE_ARRAY);
     setters.get(1).set(pojo, BYTE_BUFFER.array());
 

Reply via email to