[ 
https://issues.apache.org/jira/browse/BEAM-4454?focusedWorklogId=175470&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175470
 ]

ASF GitHub Bot logged work on BEAM-4454:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Dec/18 17:16
            Start Date: 14/Dec/18 17:16
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #7267: [BEAM-4454] 
Support Avro POJO objects
URL: https://github.com/apache/beam/pull/7267
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
similarity index 67%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
index d8e4bda342f8..29bf51a06a77 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroRecordSchema.java
@@ -17,35 +17,34 @@
  */
 package org.apache.beam.sdk.schemas;
 
-import org.apache.avro.specific.SpecificRecord;
-import 
org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory;
 import org.apache.beam.sdk.schemas.utils.AvroUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
- * A {@link SchemaProvider} for AVRO generated SpecificRecords.
+ * A {@link SchemaProvider} for AVRO generated SpecificRecords and POJOs.
  *
- * <p>This provider infers a schema from generates SpecificRecord objects, and 
creates schemas and
- * rows that bind to the appropriate fields.
+ * <p>This provider infers a schema from generated SpecificRecord objects, and 
creates schemas and
+ * rows that bind to the appropriate fields. This provider also infers schemas 
from Java POJO
+ * objects, creating a schema that matches that inferred by the AVRO libraries.
  */
-public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider {
+public class AvroRecordSchema extends GetterBasedSchemaProvider {
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return AvroUtils.getSchema((Class<? extends SpecificRecord>) 
typeDescriptor.getRawType());
+    return AvroUtils.getSchema(typeDescriptor.getRawType());
   }
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
-    return new AvroSpecificRecordGetterFactory();
+    return AvroUtils::getGetters;
   }
 
   @Override
   public UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    return new AvroSpecificRecordUserTypeCreatorFactory();
+    return AvroUtils::getCreator;
   }
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
-    return new AvroSpecificRecordTypeInformationFactory();
+    return AvroUtils::getFieldTypes;
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
deleted file mode 100644
index fcb85f4dd664..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordGetterFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas;
-
-import java.util.List;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-
-/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */
-public class AvroSpecificRecordGetterFactory implements 
FieldValueGetterFactory {
-  @Override
-  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, 
schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
deleted file mode 100644
index 68d0d6a7d068..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AvroSpecificRecordUserTypeCreatorFactory.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-
-/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */
-public class AvroSpecificRecordUserTypeCreatorFactory implements 
UserTypeCreatorFactory {
-  @Override
-  public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
-    return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, 
schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
index 5cccdf67ef3d..593853ffef29 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FieldValueTypeInformation.java
@@ -22,22 +22,33 @@
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
-/** Represents type information for a schema field. */
+/** Represents type information for a Java type that will be used to infer a 
Schema type. */
 @AutoValue
 public abstract class FieldValueTypeInformation implements Serializable {
   /** Returns the field name. */
   public abstract String getName();
 
+  /** Returns whether the field is nullable. */
+  public abstract boolean isNullable();
+
   /** Returns the field type. */
-  public abstract Class getType();
+  public abstract TypeDescriptor getType();
+
+  @Nullable
+  public abstract Field getField();
+
+  @Nullable
+  public abstract Method getMethod();
 
   /** If the field is a container type, returns the element type. */
   @Nullable
@@ -51,26 +62,90 @@
   @Nullable
   public abstract Type getMapValueType();
 
-  public static FieldValueTypeInformation of(Field field) {
-    return new AutoValue_FieldValueTypeInformation(
-        field.getName(),
-        field.getType(),
-        getArrayComponentType(field),
-        getMapKeyType(field),
-        getMapValueType(field));
+  abstract Builder toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    public abstract Builder setName(String name);
+
+    public abstract Builder setNullable(boolean nullable);
+
+    public abstract Builder setType(TypeDescriptor type);
+
+    public abstract Builder setField(@Nullable Field field);
+
+    public abstract Builder setMethod(@Nullable Method method);
+
+    public abstract Builder setElementType(@Nullable Type elementType);
+
+    public abstract Builder setMapKeyType(@Nullable Type mapKeyType);
+
+    public abstract Builder setMapValueType(@Nullable Type mapValueType);
+
+    abstract FieldValueTypeInformation build();
+  }
+
+  public static FieldValueTypeInformation forField(Field field) {
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(field.getName())
+        .setNullable(field.isAnnotationPresent(Nullable.class))
+        .setType(TypeDescriptor.of(field.getGenericType()))
+        .setField(field)
+        .setElementType(getArrayComponentType(field))
+        .setMapKeyType(getMapKeyType(field))
+        .setMapValueType(getMapValueType(field))
+        .build();
+  }
+
+  public static FieldValueTypeInformation forGetter(Method method) {
+    String name;
+    if (method.getName().startsWith("get")) {
+      name = ReflectUtils.stripPrefix(method.getName(), "get");
+    } else if (method.getName().startsWith("is")) {
+      name = ReflectUtils.stripPrefix(method.getName(), "is");
+    } else {
+      throw new RuntimeException("Getter has wrong prefix " + 
method.getName());
+    }
+
+    TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType());
+    boolean nullable = method.isAnnotationPresent(Nullable.class);
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(name)
+        .setNullable(nullable)
+        .setType(type)
+        .setMethod(method)
+        .setElementType(getArrayComponentType(type))
+        .setMapKeyType(getMapKeyType(type))
+        .setMapValueType(getMapValueType(type))
+        .build();
   }
 
-  public static FieldValueTypeInformation of(TypeInformation typeInformation) {
-    return new AutoValue_FieldValueTypeInformation(
-        typeInformation.getName(),
-        typeInformation.getType().getRawType(),
-        getArrayComponentType(typeInformation),
-        getMapKeyType(typeInformation),
-        getMapValueType(typeInformation));
+  public static FieldValueTypeInformation forSetter(Method method) {
+    String name;
+    if (method.getName().startsWith("set")) {
+      name = ReflectUtils.stripPrefix(method.getName(), "set");
+    } else {
+      throw new RuntimeException("Setter has wrong prefix " + 
method.getName());
+    }
+    if (method.getParameterCount() != 1) {
+      throw new RuntimeException("Setter methods should take a single 
argument.");
+    }
+    TypeDescriptor type = 
TypeDescriptor.of(method.getGenericParameterTypes()[0]);
+    boolean nullable =
+        
Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance);
+    return new AutoValue_FieldValueTypeInformation.Builder()
+        .setName(name)
+        .setNullable(nullable)
+        .setType(type)
+        .setMethod(method)
+        .setElementType(getArrayComponentType(type))
+        .setMapKeyType(getMapKeyType(type))
+        .setMapValueType(getMapValueType(type))
+        .build();
   }
 
-  private static Type getArrayComponentType(TypeInformation typeInformation) {
-    return getArrayComponentType(typeInformation.getType());
+  public FieldValueTypeInformation withName(String name) {
+    return toBuilder().setName(name).build();
   }
 
   private static Type getArrayComponentType(Field field) {
@@ -98,15 +173,19 @@ private static Type getArrayComponentType(TypeDescriptor 
valueType) {
     return null;
   }
 
+  public Class getRawType() {
+    return getType().getRawType();
+  }
+
   // If the Field is a map type, returns the key type, otherwise returns a 
null reference.
   @Nullable
   private static Type getMapKeyType(Field field) {
-    return getMapType(TypeDescriptor.of(field.getGenericType()), 0);
+    return getMapKeyType(TypeDescriptor.of(field.getGenericType()));
   }
 
   @Nullable
-  private static Type getMapKeyType(TypeInformation typeInformation) {
-    return getMapType(typeInformation.getType(), 0);
+  private static Type getMapKeyType(TypeDescriptor<?> typeDescriptor) {
+    return getMapType(typeDescriptor, 0);
   }
 
   // If the Field is a map type, returns the value type, otherwise returns a 
null reference.
@@ -116,8 +195,8 @@ private static Type getMapValueType(Field field) {
   }
 
   @Nullable
-  private static Type getMapValueType(TypeInformation typeInformation) {
-    return getMapType(typeInformation.getType(), 1);
+  private static Type getMapValueType(TypeDescriptor typeDescriptor) {
+    return getMapType(typeDescriptor, 1);
   }
 
   // If the Field is a map type, returns the key or value type (0 is key type, 
1 is value).
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
index 2ec1a42e7d07..139281f7f779 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/FromRowUsingCreator.java
@@ -73,7 +73,7 @@ public T apply(Row row) {
           fromValue(
               type,
               row.getValue(i),
-              typeInformation.getType(),
+              typeInformation.getRawType(),
               typeInformation.getElementType(),
               typeInformation.getMapKeyType(),
               typeInformation.getMapValueType(),
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index f7757a8174e7..677823f9c0d1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.schemas;
 
 import java.util.List;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -38,7 +37,6 @@
   abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory();
 
   /** Implementing class should override to return a constructor factory. */
-  @Nullable
   abstract UserTypeCreatorFactory schemaTypeCreatorFactory();
 
   @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
index 2c22400b050c..8eb8022841e3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java
@@ -17,13 +17,16 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.schemas.utils.JavaBeanGetterFactory;
-import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory;
-import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
 import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -41,15 +44,55 @@
  */
 @Experimental(Kind.SCHEMAS)
 public class JavaBeanSchema extends GetterBasedSchemaProvider {
+  /** {@link FieldValueTypeSupplier} that's based on getter methods. */
+  @VisibleForTesting
+  public static class GetterTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types =
+          ReflectUtils.getMethods(clazz)
+              .stream()
+              .filter(ReflectUtils::isGetter)
+              .map(FieldValueTypeInformation::forGetter)
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
+  }
+
+  /** {@link FieldValueTypeSupplier} that's based on setter methods. */
+  @VisibleForTesting
+  public static class SetterTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types =
+          ReflectUtils.getMethods(clazz)
+              .stream()
+              .filter(ReflectUtils::isSetter)
+              .map(FieldValueTypeInformation::forSetter)
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
+  }
+
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
-    return JavaBeanUtils.schemaFromJavaBeanClass(
-        typeDescriptor.getRawType(), SerializableFunctions.identity());
+    return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
   }
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
-    return new JavaBeanGetterFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        JavaBeanUtils.getGetters(targetClass, schema, new 
GetterTypeSupplier());
   }
 
   @Override
@@ -59,6 +102,15 @@ UserTypeCreatorFactory schemaTypeCreatorFactory() {
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
-    return new JavaBeanTypeInformationFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        JavaBeanUtils.getFieldTypes(targetClass, schema, new 
GetterTypeSupplier());
+  }
+
+  /** A factory for creating {@link FieldValueSetter} objects for a JavaBean 
object. */
+  public static class JavaBeanSetterFactory implements FieldValueSetterFactory 
{
+    @Override
+    public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
+      return JavaBeanUtils.getSetters(targetClass, schema, new 
SetterTypeSupplier());
+    }
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
index a7d6a3020ccd..1504717f78c7 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.schemas;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
 import org.apache.beam.sdk.schemas.utils.POJOUtils;
-import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory;
-import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory;
+import org.apache.beam.sdk.schemas.utils.ReflectUtils;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
 /**
@@ -39,6 +44,25 @@
  */
 @Experimental(Kind.SCHEMAS)
 public class JavaFieldSchema extends GetterBasedSchemaProvider {
+  /** {@link FieldValueTypeSupplier} that's based on public fields. */
+  @VisibleForTesting
+  public static class JavaFieldTypeSupplier implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types =
+          ReflectUtils.getFields(clazz)
+              .stream()
+              .map(FieldValueTypeInformation::forField)
+              .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
+  }
+
   @Override
   public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
     return POJOUtils.schemaFromPojoClass(typeDescriptor.getRawType());
@@ -46,16 +70,19 @@
 
   @Override
   public FieldValueGetterFactory fieldValueGetterFactory() {
-    return new PojoValueGetterFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        POJOUtils.getGetters(targetClass, schema, new JavaFieldTypeSupplier());
   }
 
   @Override
   public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
-    return new PojoValueTypeInformationFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        POJOUtils.getFieldTypes(targetClass, schema, new 
JavaFieldTypeSupplier());
   }
 
   @Override
   UserTypeCreatorFactory schemaTypeCreatorFactory() {
-    return new PojoTypeUserTypeCreatorFactory();
+    return (Class<?> targetClass, Schema schema) ->
+        POJOUtils.getCreator(targetClass, schema, new JavaFieldTypeSupplier());
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
deleted file mode 100644
index b22798861b12..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/PojoTypeUserTypeCreatorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas;
-
-import org.apache.beam.sdk.schemas.utils.POJOUtils;
-
-/** Vends constructors for POJOs. */
-class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory {
-  @Override
-  public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
-    return POJOUtils.getCreator(clazz, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
index c5fae19948e5..6b9b18f3740c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaUserTypeConstructorCreator.java
@@ -27,7 +27,7 @@
   private final Class<?> clazz;
   private final transient Constructor<?> constructor;
 
-  SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> constructor) 
{
+  public SchemaUserTypeConstructorCreator(Class<?> clazz, Constructor<?> 
constructor) {
     this.clazz = clazz;
     this.constructor = checkNotNull(constructor);
   }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
deleted file mode 100644
index 6b76fa6fa66a..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroSpecificRecordTypeInformationFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A {@link FieldValueTypeInformation} for AVRO-generated specific records. */
-public class AvroSpecificRecordTypeInformationFactory implements 
FieldValueTypeInformationFactory {
-  @Override
-  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return AvroUtils.getFieldTypes((Class<? extends SpecificRecord>) 
targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index df33d6430389..c2e737f69418 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -23,6 +23,7 @@
 import com.google.common.base.CaseFormat;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -42,6 +43,8 @@
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecord;
@@ -56,6 +59,7 @@
 import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 
@@ -117,6 +121,11 @@ private FixedBytesField(int size) {
       this.size = size;
     }
 
+    /** Create a {@link FixedBytesField} with the specified size. */
+    public static FixedBytesField withSize(int size) {
+      return new FixedBytesField(size);
+    }
+
     /** Create a {@link FixedBytesField} from a Beam {@link FieldType}. */
     @Nullable
     public static FixedBytesField fromBeamFieldType(FieldType fieldType) {
@@ -260,59 +269,100 @@ public static GenericRecord toGenericRecord(
     return g -> toGenericRecord(g, avroSchema);
   }
 
-  /** Infer a {@link Schema} from an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> Schema getSchema(Class<T> clazz) {
-    try {
-      org.apache.avro.Schema avroSchema =
-          (org.apache.avro.Schema) 
(clazz.getDeclaredField("SCHEMA$").get(null));
-      return toBeamSchema(avroSchema);
-    } catch (NoSuchFieldException | IllegalAccessException e) {
-      throw new IllegalArgumentException(
-          "Class "
-              + clazz
-              + " is not an AVRO SpecificRecord. "
-              + "No public SCHEMA$ field was found.");
-    }
+  /** Infer a {@link Schema} from either an AVRO-generated SpecificRecord or a 
POJO. */
+  public static <T> Schema getSchema(Class<T> clazz) {
+    return toBeamSchema(ReflectData.get().getSchema(clazz));
   }
 
-  private static final class AvroSpecificRecordFieldNamePolicy
-      implements SerializableFunction<String, String> {
-    Schema schema;
-    Map<String, String> nameMapping = Maps.newHashMap();
+  private static final class AvroSpecificRecordFieldValueTypeSupplier
+      implements FieldValueTypeSupplier {
+    @Override
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, String> mapping = getMapping(schema);
+      Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
+      for (Method method : ReflectUtils.getMethods(clazz)) {
+        if (ReflectUtils.isGetter(method)) {
+          FieldValueTypeInformation fieldValueTypeInformation =
+              FieldValueTypeInformation.forGetter(method);
+          String name = mapping.get(fieldValueTypeInformation.getName());
+          if (name != null) {
+            types.put(name, fieldValueTypeInformation.withName(name));
+          }
+        }
+      }
+
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
+    }
 
-    AvroSpecificRecordFieldNamePolicy(Schema schema) {
-      this.schema = schema;
+    private Map<String, String> getMapping(Schema schema) {
+      Map<String, String> mapping = Maps.newHashMap();
       for (Field field : schema.getFields()) {
-        String getter = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, 
field.getName());
-        nameMapping.put(getter, field.getName());
+        String underscore = 
CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, field.getName());
+        mapping.put(underscore, field.getName());
         // The Avro compiler might add a $ at the end of a getter to 
disambiguate.
-        nameMapping.put(getter + "$", field.getName());
+        mapping.put(underscore + "$", field.getName());
+        // If the field is in camel case already, then it's the identity 
mapping.
+        mapping.put(field.getName(), field.getName());
       }
+      return mapping;
     }
+  }
 
+  private static final class AvroPojoFieldValueTypeSupplier implements 
FieldValueTypeSupplier {
     @Override
-    public String apply(String input) {
-      return nameMapping.getOrDefault(input, input);
+    public List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema) {
+      Map<String, FieldValueTypeInformation> types = Maps.newHashMap();
+      for (java.lang.reflect.Field f : ReflectUtils.getFields(clazz)) {
+        if (!f.isAnnotationPresent(AvroIgnore.class)) {
+          FieldValueTypeInformation typeInformation = 
FieldValueTypeInformation.forField(f);
+          AvroName avroname = f.getAnnotation(AvroName.class);
+          if (avroname != null) {
+            typeInformation = typeInformation.withName(avroname.value());
+          }
+          types.put(typeInformation.getName(), typeInformation);
+        }
+      }
+      // Return the list ordered by the schema fields.
+      return schema
+          .getFields()
+          .stream()
+          .map(f -> types.get(f.getName()))
+          .collect(Collectors.toList());
     }
   }
 
-  /** Get field types for an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> List<FieldValueTypeInformation> 
getFieldTypes(
-      Class<T> clazz, Schema schema) {
-    return JavaBeanUtils.getFieldTypes(
-        clazz, schema, new AvroSpecificRecordFieldNamePolicy(schema));
+  /** Get field types for an AVRO-generated SpecificRecord or a POJO. */
+  public static <T> List<FieldValueTypeInformation> getFieldTypes(Class<T> 
clazz, Schema schema) {
+    if 
(TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) 
{
+      return JavaBeanUtils.getFieldTypes(
+          clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier());
+    } else {
+      return POJOUtils.getFieldTypes(clazz, schema, new 
AvroPojoFieldValueTypeSupplier());
+    }
   }
 
-  /** Get generated getters for an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> List<FieldValueGetter> getGetters(
-      Class<T> clazz, Schema schema) {
-    return JavaBeanUtils.getGetters(clazz, schema, new 
AvroSpecificRecordFieldNamePolicy(schema));
+  /** Get generated getters for an AVRO-generated SpecificRecord or a POJO. */
+  public static <T> List<FieldValueGetter> getGetters(Class<T> clazz, Schema 
schema) {
+    if 
(TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) 
{
+      return JavaBeanUtils.getGetters(
+          clazz, schema, new AvroSpecificRecordFieldValueTypeSupplier());
+    } else {
+      return POJOUtils.getGetters(clazz, schema, new 
AvroPojoFieldValueTypeSupplier());
+    }
   }
 
   /** Get an object creator for an AVRO-generated SpecificRecord. */
-  public static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
-      Class<T> clazz, Schema schema) {
-    return AvroByteBuddyUtils.getCreator(clazz, schema);
+  public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema 
schema) {
+    if 
(TypeDescriptor.of(clazz).isSubtypeOf(TypeDescriptor.of(SpecificRecord.class))) 
{
+      return AvroByteBuddyUtils.getCreator((Class<? extends SpecificRecord>) 
clazz, schema);
+    } else {
+      return POJOUtils.getCreator(clazz, schema, new 
AvroPojoFieldValueTypeSupplier());
+    }
   }
 
   /** Converts AVRO schema to Beam field. */
@@ -480,7 +530,6 @@ public String apply(String input) {
   private static Object genericFromBeamField(
       Schema.FieldType fieldType, org.apache.avro.Schema avroSchema, @Nullable 
Object value) {
     TypeWithNullability typeWithNullability = new 
TypeWithNullability(avroSchema);
-
     if (!fieldType.getNullable().equals(typeWithNullability.nullable)) {
       throw new IllegalArgumentException(
           "FieldType "
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
similarity index 68%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
index 84f9a5e050d6..12330ecb20b2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueTypeInformationFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/FieldValueTypeSupplier.java
@@ -17,15 +17,19 @@
  */
 package org.apache.beam.sdk.schemas.utils;
 
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
 import org.apache.beam.sdk.schemas.Schema;
 
-/** A {@link FieldValueTypeInformationFactory} for POJO objects objects. */
-public class PojoValueTypeInformationFactory implements 
FieldValueTypeInformationFactory {
-  @Override
-  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return POJOUtils.getFieldTypes(targetClass, schema);
-  }
+/**
+ * A naming policy for schema fields. This maps a name from the class (field 
name or getter name) to
+ * the matching field name in the schema.
+ */
+public interface FieldValueTypeSupplier extends Serializable {
+  /**
+   * Return all the FieldValueTypeInformations. The returned list must be in 
the same order as
+   * fields in the schema.
+   */
+  List<FieldValueTypeInformation> get(Class<?> clazz, Schema schema);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
deleted file mode 100644
index 5c3d2ea17155..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanGetterFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueGetter;
-import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-
-/** A factory for creating {@link FieldValueGetter} objects for a JavaBean 
object. */
-public class JavaBeanGetterFactory implements FieldValueGetterFactory {
-  @Override
-  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return JavaBeanUtils.getGetters(targetClass, schema, 
SerializableFunctions.identity());
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
deleted file mode 100644
index e67a58dc6837..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanSetterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueSetter;
-import org.apache.beam.sdk.schemas.FieldValueSetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A factory for creating {@link FieldValueSetter} objects for a JavaBean 
object. */
-public class JavaBeanSetterFactory implements FieldValueSetterFactory {
-  @Override
-  public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
-    return JavaBeanUtils.getSetters(targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
deleted file mode 100644
index f445a87dc9ca..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanTypeInformationFactory.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
-import org.apache.beam.sdk.schemas.FieldValueTypeInformationFactory;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-
-/** A {@link FieldValueTypeInformationFactory} for Java Bean objects. */
-public class JavaBeanTypeInformationFactory implements 
FieldValueTypeInformationFactory {
-  @Override
-  public List<FieldValueTypeInformation> create(Class<?> targetClass, Schema 
schema) {
-    return JavaBeanUtils.getFieldTypes(targetClass, schema, 
SerializableFunctions.identity());
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
index 0ec07c9d8d77..58cc0ed0254a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtils.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.schemas.utils;
 
 import com.google.common.collect.Maps;
-import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.List;
@@ -48,55 +47,46 @@
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
-import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 
 /** A set of utilities to generate getter and setter classes for JavaBean 
objects. */
 @Experimental(Kind.SCHEMAS)
 public class JavaBeanUtils {
   /** Create a {@link Schema} for a Java Bean class. */
-  public static Schema schemaFromJavaBeanClass(
-      Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) {
-    return StaticSchemaInference.schemaFromClass(
-        clazz, c -> JavaBeanUtils.typeInformationFromClass(c, 
fieldNamePolicy));
+  public static Schema schemaFromJavaBeanClass(Class<?> clazz) {
+    return StaticSchemaInference.schemaFromClass(clazz, 
JavaBeanUtils::typeInformationFromClass);
   }
 
-  private static List<TypeInformation> typeInformationFromClass(
-      Class<?> clazz, SerializableFunction<String, String> fieldNamePolicy) {
-    try {
-      List<TypeInformation> getterTypes =
-          ReflectUtils.getMethods(clazz)
-              .stream()
-              .filter(ReflectUtils::isGetter)
-              .map(m -> TypeInformation.forGetter(m, fieldNamePolicy))
-              .collect(Collectors.toList());
+  private static List<FieldValueTypeInformation> 
typeInformationFromClass(Class<?> clazz) {
+    List<FieldValueTypeInformation> getterTypes =
+        ReflectUtils.getMethods(clazz)
+            .stream()
+            .filter(ReflectUtils::isGetter)
+            .map(FieldValueTypeInformation::forGetter)
+            .collect(Collectors.toList());
 
-      Map<String, TypeInformation> setterTypes =
-          ReflectUtils.getMethods(clazz)
-              .stream()
-              .filter(ReflectUtils::isSetter)
-              .map(m -> TypeInformation.forSetter(m))
-              .collect(Collectors.toMap(TypeInformation::getName, 
Function.identity()));
-      validateJavaBean(getterTypes, setterTypes);
-      return getterTypes;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
+    Map<String, FieldValueTypeInformation> setterTypes =
+        ReflectUtils.getMethods(clazz)
+            .stream()
+            .filter(ReflectUtils::isSetter)
+            .map(FieldValueTypeInformation::forSetter)
+            .collect(Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
+    validateJavaBean(getterTypes, setterTypes);
+    return getterTypes;
   }
 
   // Make sure that there are matching setters and getters.
   private static void validateJavaBean(
-      List<TypeInformation> getters, Map<String, TypeInformation> setters) {
-    for (TypeInformation type : getters) {
-      TypeInformation setterType = setters.get(type.getName());
+      List<FieldValueTypeInformation> getters, Map<String, 
FieldValueTypeInformation> setters) {
+    for (FieldValueTypeInformation type : getters) {
+      FieldValueTypeInformation setterType = setters.get(type.getName());
       if (setterType == null) {
         throw new RuntimeException(
             "JavaBean contained a getter for field "
                 + type.getName()
                 + "but did not contain a matching setter.");
       }
-      if (!type.equals(setterType)) {
+      if (!type.getType().equals(setterType.getType())) {
         throw new RuntimeException(
             "JavaBean contained setter for field "
                 + type.getName()
@@ -118,28 +108,9 @@ private static void validateJavaBean(
       Maps.newConcurrentMap();
 
   public static List<FieldValueTypeInformation> getFieldTypes(
-      Class<?> clazz, Schema schema, SerializableFunction<String, String> 
fieldNamePolicy) {
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
-        c -> {
-          try {
-            Map<String, FieldValueTypeInformation> getterMap =
-                ReflectUtils.getMethods(clazz)
-                    .stream()
-                    .filter(ReflectUtils::isGetter)
-                    .map(m -> TypeInformation.forGetter(m, fieldNamePolicy))
-                    .map(FieldValueTypeInformation::of)
-                    .collect(
-                        Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
-            return schema
-                .getFields()
-                .stream()
-                .map(f -> getterMap.get(f.getName()))
-                .collect(Collectors.toList());
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
-        });
+        new ClassWithSchema(clazz, schema), c -> 
fieldValueTypeSupplier.get(clazz, schema));
   }
 
   // The list of getters for a class is cached, so we only create the classes 
the first time
@@ -153,37 +124,22 @@ private static void validateJavaBean(
    * <p>The returned list is ordered by the order of fields in the schema.
    */
   public static List<FieldValueGetter> getGetters(
-      Class<?> clazz, Schema schema, SerializableFunction<String, String> 
fieldNamePolicy) {
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_GETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          try {
-            Map<String, FieldValueGetter> getterMap =
-                ReflectUtils.getMethods(clazz)
-                    .stream()
-                    .filter(ReflectUtils::isGetter)
-                    .map(m -> JavaBeanUtils.createGetter(m, fieldNamePolicy))
-                    .collect(Collectors.toMap(FieldValueGetter::name, 
Function.identity()));
-            return schema
-                .getFields()
-                .stream()
-                .map(f -> getterMap.get(f.getName()))
-                .collect(Collectors.toList());
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return 
types.stream().map(JavaBeanUtils::createGetter).collect(Collectors.toList());
         });
   }
 
-  private static <T> FieldValueGetter createGetter(
-      Method getterMethod, SerializableFunction<String, String> 
fieldNamePolicy) {
-    TypeInformation typeInformation = TypeInformation.forGetter(getterMethod, 
fieldNamePolicy);
+  private static <T> FieldValueGetter createGetter(FieldValueTypeInformation 
typeInformation) {
     DynamicType.Builder<FieldValueGetter> builder =
         ByteBuddyUtils.subclassGetterInterface(
             BYTE_BUDDY,
-            getterMethod.getDeclaringClass(),
+            typeInformation.getMethod().getDeclaringClass(),
             new ConvertType(false).convert(typeInformation.getType()));
-    builder = implementGetterMethods(builder, getterMethod, fieldNamePolicy);
+    builder = implementGetterMethods(builder, typeInformation);
     try {
       return builder
           .make()
@@ -195,20 +151,18 @@ private static void validateJavaBean(
         | IllegalAccessException
         | NoSuchMethodException
         | InvocationTargetException e) {
-      throw new RuntimeException("Unable to generate a getter for getter '" + 
getterMethod + "'");
+      throw new RuntimeException(
+          "Unable to generate a getter for getter '" + 
typeInformation.getMethod() + "'");
     }
   }
 
   private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
-      DynamicType.Builder<FieldValueGetter> builder,
-      Method method,
-      SerializableFunction<String, String> fieldNamePolicy) {
-    TypeInformation typeInformation = TypeInformation.forGetter(method, 
fieldNamePolicy);
+      DynamicType.Builder<FieldValueGetter> builder, FieldValueTypeInformation 
typeInformation) {
     return builder
         .method(ElementMatchers.named("name"))
         .intercept(FixedValue.reference(typeInformation.getName()))
         .method(ElementMatchers.named("get"))
-        .intercept(new InvokeGetterInstruction(method, fieldNamePolicy));
+        .intercept(new InvokeGetterInstruction(typeInformation));
   }
 
   // The list of setters for a class is cached, so we only create the classes 
the first time
@@ -221,36 +175,23 @@ private static void validateJavaBean(
    *
    * <p>The returned list is ordered by the order of fields in the schema.
    */
-  public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueSetter> getSetters(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_SETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          try {
-            Map<String, FieldValueSetter> setterMap =
-                ReflectUtils.getMethods(clazz)
-                    .stream()
-                    .filter(ReflectUtils::isSetter)
-                    .map(JavaBeanUtils::createSetter)
-                    .collect(Collectors.toMap(FieldValueSetter::name, 
Function.identity()));
-            return schema
-                .getFields()
-                .stream()
-                .map(f -> setterMap.get(f.getName()))
-                .collect(Collectors.toList());
-          } catch (IOException e) {
-            throw new RuntimeException(e);
-          }
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return 
types.stream().map(JavaBeanUtils::createSetter).collect(Collectors.toList());
         });
   }
 
-  private static <T> FieldValueSetter createSetter(Method setterMethod) {
-    TypeInformation typeInformation = TypeInformation.forSetter(setterMethod);
+  private static FieldValueSetter createSetter(FieldValueTypeInformation 
typeInformation) {
     DynamicType.Builder<FieldValueSetter> builder =
         ByteBuddyUtils.subclassSetterInterface(
             BYTE_BUDDY,
-            setterMethod.getDeclaringClass(),
+            typeInformation.getMethod().getDeclaringClass(),
             new ConvertType(false).convert(typeInformation.getType()));
-    builder = implementSetterMethods(builder, setterMethod);
+    builder = implementSetterMethods(builder, typeInformation.getMethod());
     try {
       return builder
           .make()
@@ -262,29 +203,27 @@ private static void validateJavaBean(
         | IllegalAccessException
         | NoSuchMethodException
         | InvocationTargetException e) {
-      throw new RuntimeException("Unable to generate a setter for setter '" + 
setterMethod + "'");
+      throw new RuntimeException(
+          "Unable to generate a setter for setter '" + 
typeInformation.getMethod() + "'");
     }
   }
 
   private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
       DynamicType.Builder<FieldValueSetter> builder, Method method) {
-    TypeInformation typeInformation = TypeInformation.forSetter(method);
+    FieldValueTypeInformation javaTypeInformation = 
FieldValueTypeInformation.forSetter(method);
     return builder
         .method(ElementMatchers.named("name"))
-        .intercept(FixedValue.reference(typeInformation.getName()))
+        .intercept(FixedValue.reference(javaTypeInformation.getName()))
         .method(ElementMatchers.named("set"))
         .intercept(new InvokeSetterInstruction(method));
   }
 
   // Implements a method to read a public getter out of an object.
   private static class InvokeGetterInstruction implements Implementation {
-    // Getter method that wil be invoked
-    private Method method;
-    private SerializableFunction<String, String> fieldNamePolicy;
+    private final FieldValueTypeInformation typeInformation;
 
-    InvokeGetterInstruction(Method method, SerializableFunction<String, 
String> fieldNamePolicy) {
-      this.method = method;
-      this.fieldNamePolicy = fieldNamePolicy;
+    InvokeGetterInstruction(FieldValueTypeInformation typeInformation) {
+      this.typeInformation = typeInformation;
     }
 
     @Override
@@ -295,7 +234,6 @@ public InstrumentedType prepare(InstrumentedType 
instrumentedType) {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return (methodVisitor, implementationContext, instrumentedMethod) -> {
-        TypeInformation typeInformation = TypeInformation.forGetter(method, 
fieldNamePolicy);
         // this + method parameters.
         int numLocals = 1 + instrumentedMethod.getParameters().size();
 
@@ -305,7 +243,7 @@ public ByteCodeAppender appender(final Target 
implementationTarget) {
                 // Method param is offset 1 (offset 0 is the this parameter).
                 MethodVariableAccess.REFERENCE.loadFrom(1),
                 // Invoke the getter
-                MethodInvocation.invoke(new ForLoadedMethod(method)));
+                MethodInvocation.invoke(new 
ForLoadedMethod(typeInformation.getMethod())));
 
         StackManipulation stackManipulation =
             new StackManipulation.Compound(
@@ -335,7 +273,7 @@ public InstrumentedType prepare(InstrumentedType 
instrumentedType) {
     @Override
     public ByteCodeAppender appender(final Target implementationTarget) {
       return (methodVisitor, implementationContext, instrumentedMethod) -> {
-        TypeInformation typeInformation = TypeInformation.forSetter(method);
+        FieldValueTypeInformation javaTypeInformation = 
FieldValueTypeInformation.forSetter(method);
         // this + method parameters.
         int numLocals = 1 + instrumentedMethod.getParameters().size();
 
@@ -349,7 +287,7 @@ public ByteCodeAppender appender(final Target 
implementationTarget) {
                 MethodVariableAccess.REFERENCE.loadFrom(1),
                 // Do any conversions necessary.
                 new ByteBuddyUtils.ConvertValueForSetter(readField)
-                    .convert(typeInformation.getType()),
+                    .convert(javaTypeInformation.getType()),
                 // Now update the field and return void.
                 MethodInvocation.invoke(new ForLoadedMethod(method)),
                 MethodReturn.VOID);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
index 38f5307e1259..370b3ccc0548 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/POJOUtils.java
@@ -24,6 +24,7 @@
 import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
 import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
@@ -55,7 +56,6 @@
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
 import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
 import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
-import org.apache.beam.sdk.schemas.utils.StaticSchemaInference.TypeInformation;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -63,12 +63,11 @@
 @Experimental(Kind.SCHEMAS)
 public class POJOUtils {
   public static Schema schemaFromPojoClass(Class<?> clazz) {
-    // We should cache the field order.
-    Function<Class, List<TypeInformation>> getTypesForClass =
+    Function<Class, List<FieldValueTypeInformation>> getTypesForClass =
         c ->
             ReflectUtils.getFields(c)
                 .stream()
-                .map(TypeInformation::forField)
+                .map(FieldValueTypeInformation::forField)
                 .collect(Collectors.toList());
     return StaticSchemaInference.schemaFromClass(clazz, getTypesForClass);
   }
@@ -79,22 +78,10 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
   private static final Map<ClassWithSchema, List<FieldValueTypeInformation>> 
CACHED_FIELD_TYPES =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueTypeInformation> getFieldTypes(Class<?> clazz, 
Schema schema) {
+  public static List<FieldValueTypeInformation> getFieldTypes(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_FIELD_TYPES.computeIfAbsent(
-        new ClassWithSchema(clazz, schema),
-        c -> {
-          Map<String, FieldValueTypeInformation> typeInformationMap =
-              ReflectUtils.getFields(clazz)
-                  .stream()
-                  .map(FieldValueTypeInformation::of)
-                  .collect(
-                      Collectors.toMap(FieldValueTypeInformation::getName, 
Function.identity()));
-          return schema
-              .getFields()
-              .stream()
-              .map(f -> typeInformationMap.get(f.getName()))
-              .collect(Collectors.toList());
-        });
+        new ClassWithSchema(clazz, schema), c -> 
fieldValueTypeSupplier.get(clazz, schema));
   }
 
   // The list of getters for a class is cached, so we only create the classes 
the first time
@@ -102,21 +89,20 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
   private static final Map<ClassWithSchema, List<FieldValueGetter>> 
CACHED_GETTERS =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueGetter> getGetters(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     // Return the getters ordered by their position in the schema.
     return CACHED_GETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          Map<String, FieldValueGetter> getterMap =
-              ReflectUtils.getFields(clazz)
-                  .stream()
-                  .map(POJOUtils::createGetter)
-                  .collect(Collectors.toMap(FieldValueGetter::name, 
Function.identity()));
-          return schema
-              .getFields()
-              .stream()
-              .map(f -> getterMap.get(f.getName()))
-              .collect(Collectors.toList());
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          List<FieldValueGetter> getters =
+              
types.stream().map(POJOUtils::createGetter).collect(Collectors.toList());
+          if (getters.size() != schema.getFieldCount()) {
+            throw new RuntimeException(
+                "Was not able to generate getters for schema: " + schema + " 
class: " + clazz);
+          }
+          return getters;
         });
   }
 
@@ -125,24 +111,21 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
   public static final Map<ClassWithSchema, SchemaUserTypeCreator> 
CACHED_CREATORS =
       Maps.newConcurrentMap();
 
-  public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema 
schema) {
+  public static <T> SchemaUserTypeCreator getCreator(
+      Class<T> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     return CACHED_CREATORS.computeIfAbsent(
-        new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
+        new ClassWithSchema(clazz, schema),
+        c -> {
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return createCreator(clazz, schema, types);
+        });
   }
 
-  private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, 
Schema schema) {
+  private static <T> SchemaUserTypeCreator createCreator(
+      Class<T> clazz, Schema schema, List<FieldValueTypeInformation> types) {
     // Get the list of class fields ordered by schema.
-    Map<String, Field> fieldMap =
-        ReflectUtils.getFields(clazz)
-            .stream()
-            .collect(Collectors.toMap(Field::getName, Function.identity()));
     List<Field> fields =
-        schema
-            .getFields()
-            .stream()
-            .map(f -> fieldMap.get(f.getName()))
-            .collect(Collectors.toList());
-
+        
types.stream().map(FieldValueTypeInformation::getField).collect(Collectors.toList());
     try {
       DynamicType.Builder<SchemaUserTypeCreator> builder =
           BYTE_BUDDY
@@ -179,13 +162,16 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
    * </code></pre>
    */
   @SuppressWarnings("unchecked")
-  static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> 
createGetter(Field field) {
+  @Nullable
+  static <ObjectT, ValueT> FieldValueGetter<ObjectT, ValueT> createGetter(
+      FieldValueTypeInformation typeInformation) {
+    Field field = typeInformation.getField();
     DynamicType.Builder<FieldValueGetter> builder =
         ByteBuddyUtils.subclassGetterInterface(
             BYTE_BUDDY,
             field.getDeclaringClass(),
             new 
ConvertType(false).convert(TypeDescriptor.of(field.getType())));
-    builder = implementGetterMethods(builder, field);
+    builder = implementGetterMethods(builder, field, 
typeInformation.getName());
     try {
       return builder
           .make()
@@ -202,10 +188,10 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
   }
 
   private static DynamicType.Builder<FieldValueGetter> implementGetterMethods(
-      DynamicType.Builder<FieldValueGetter> builder, Field field) {
+      DynamicType.Builder<FieldValueGetter> builder, Field field, String name) 
{
     return builder
         .method(ElementMatchers.named("name"))
-        .intercept(FixedValue.reference(field.getName()))
+        .intercept(FixedValue.reference(name))
         .method(ElementMatchers.named("get"))
         .intercept(new ReadFieldInstruction(field));
   }
@@ -215,21 +201,14 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
   private static final Map<ClassWithSchema, List<FieldValueSetter>> 
CACHED_SETTERS =
       Maps.newConcurrentMap();
 
-  public static List<FieldValueSetter> getSetters(Class<?> clazz, Schema 
schema) {
+  public static List<FieldValueSetter> getSetters(
+      Class<?> clazz, Schema schema, FieldValueTypeSupplier 
fieldValueTypeSupplier) {
     // Return the setters, ordered by their position in the schema.
     return CACHED_SETTERS.computeIfAbsent(
         new ClassWithSchema(clazz, schema),
         c -> {
-          Map<String, FieldValueSetter> setterMap =
-              ReflectUtils.getFields(clazz)
-                  .stream()
-                  .map(POJOUtils::createSetter)
-                  .collect(Collectors.toMap(FieldValueSetter::name, 
Function.identity()));
-          return schema
-              .getFields()
-              .stream()
-              .map(f -> setterMap.get(f.getName()))
-              .collect(Collectors.toList());
+          List<FieldValueTypeInformation> types = 
fieldValueTypeSupplier.get(clazz, schema);
+          return 
types.stream().map(POJOUtils::createSetter).collect(Collectors.toList());
         });
   }
 
@@ -250,7 +229,9 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
    * </code></pre>
    */
   @SuppressWarnings("unchecked")
-  private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> 
createSetter(Field field) {
+  private static <ObjectT, ValueT> FieldValueSetter<ObjectT, ValueT> 
createSetter(
+      FieldValueTypeInformation typeInformation) {
+    Field field = typeInformation.getField();
     DynamicType.Builder<FieldValueSetter> builder =
         ByteBuddyUtils.subclassSetterInterface(
             BYTE_BUDDY,
@@ -284,7 +265,7 @@ public static Schema schemaFromPojoClass(Class<?> clazz) {
   // Implements a method to read a public field out of an object.
   static class ReadFieldInstruction implements Implementation {
     // Field that will be read.
-    private Field field;
+    private final Field field;
 
     ReadFieldInstruction(Field field) {
       this.field = field;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
deleted file mode 100644
index 275b791318f4..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueGetterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueGetter;
-import org.apache.beam.sdk.schemas.FieldValueGetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A factory for creating {@link FieldValueGetter} objects for a POJO. */
-public class PojoValueGetterFactory implements FieldValueGetterFactory {
-  @Override
-  public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
-    return POJOUtils.getGetters(targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
deleted file mode 100644
index 5e9447a2e04a..000000000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/PojoValueSetterFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.schemas.utils;
-
-import java.util.List;
-import org.apache.beam.sdk.schemas.FieldValueSetter;
-import org.apache.beam.sdk.schemas.FieldValueSetterFactory;
-import org.apache.beam.sdk.schemas.Schema;
-
-/** A factory for creating {@link FieldValueSetter} objects for a POJO. */
-public class PojoValueSetterFactory implements FieldValueSetterFactory {
-  @Override
-  public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
-    return POJOUtils.getSetters(targetClass, schema);
-  }
-}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
index 8619ea5e1780..c9d943b6ee6b 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/ReflectUtils.java
@@ -21,7 +21,6 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
@@ -40,7 +39,7 @@
     private final Class clazz;
     private final Schema schema;
 
-    public ClassWithSchema(Class clazz, Schema schema) {
+    ClassWithSchema(Class clazz, Schema schema) {
       this.clazz = clazz;
       this.schema = schema;
     }
@@ -67,7 +66,7 @@ public int hashCode() {
   private static final Map<Class, List<Field>> DECLARED_FIELDS = 
Maps.newHashMap();
 
   /** Returns the list of public, non-static methods in the class, caching the 
results. */
-  static List<Method> getMethods(Class clazz) throws IOException {
+  public static List<Method> getMethods(Class clazz) {
     return DECLARED_METHODS.computeIfAbsent(
         clazz,
         c -> {
@@ -79,7 +78,7 @@ public int hashCode() {
   }
 
   // Get all public, non-static, non-transient fields.
-  static List<Field> getFields(Class<?> clazz) {
+  public static List<Field> getFields(Class<?> clazz) {
     return DECLARED_FIELDS.computeIfAbsent(
         clazz,
         c -> {
@@ -104,7 +103,7 @@ public int hashCode() {
         });
   }
 
-  static boolean isGetter(Method method) {
+  public static boolean isGetter(Method method) {
     if (Void.TYPE.equals(method.getReturnType())) {
       return false;
     }
@@ -118,13 +117,13 @@ static boolean isGetter(Method method) {
             || Boolean.class.equals(method.getReturnType())));
   }
 
-  static boolean isSetter(Method method) {
+  public static boolean isSetter(Method method) {
     return Void.TYPE.equals(method.getReturnType())
         && method.getParameterCount() == 1
         && method.getName().startsWith("set");
   }
 
-  static String stripPrefix(String methodName, String prefix) {
+  public static String stripPrefix(String methodName, String prefix) {
     String firstLetter = methodName.substring(prefix.length(), prefix.length() 
+ 1).toLowerCase();
 
     return (methodName.length() == prefix.length() + 1)
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
index fdf978cfb46b..d3f81e8f7ec1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/StaticSchemaInference.java
@@ -20,21 +20,16 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.function.Function;
-import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.ReadableInstant;
 
@@ -64,94 +59,6 @@
           .put(BigDecimal.class, FieldType.DECIMAL)
           .build();
 
-  /** Relevant information about a Java type. */
-  public static class TypeInformation {
-    private final String name;
-    private final TypeDescriptor type;
-    private final boolean nullable;
-
-    /** Construct a {@link TypeInformation}. */
-    private TypeInformation(String name, TypeDescriptor type, boolean 
nullable) {
-      this.name = name;
-      this.type = type;
-      this.nullable = nullable;
-    }
-
-    /** Construct a {@link TypeInformation} from a class member variable. */
-    public static TypeInformation forField(Field field) {
-      return new TypeInformation(
-          field.getName(),
-          TypeDescriptor.of(field.getGenericType()),
-          field.isAnnotationPresent(Nullable.class));
-    }
-
-    /** Construct a {@link TypeInformation} from a class getter. */
-    public static TypeInformation forGetter(
-        Method method, SerializableFunction<String, String> fieldNamePolicy) {
-      String name;
-      if (method.getName().startsWith("get")) {
-        name = ReflectUtils.stripPrefix(method.getName(), "get");
-      } else if (method.getName().startsWith("is")) {
-        name = ReflectUtils.stripPrefix(method.getName(), "is");
-      } else {
-        throw new RuntimeException("Getter has wrong prefix " + 
method.getName());
-      }
-      name = fieldNamePolicy.apply(name);
-
-      TypeDescriptor type = TypeDescriptor.of(method.getGenericReturnType());
-      boolean nullable = method.isAnnotationPresent(Nullable.class);
-      return new TypeInformation(name, type, nullable);
-    }
-
-    /** Construct a {@link TypeInformation} from a class setter. */
-    public static TypeInformation forSetter(Method method) {
-      String name;
-      if (method.getName().startsWith("set")) {
-        name = ReflectUtils.stripPrefix(method.getName(), "set");
-      } else {
-        throw new RuntimeException("Setter has wrong prefix " + 
method.getName());
-      }
-      if (method.getParameterCount() != 1) {
-        throw new RuntimeException("Setter methods should take a single 
argument.");
-      }
-      TypeDescriptor type = 
TypeDescriptor.of(method.getGenericParameterTypes()[0]);
-      boolean nullable =
-          
Arrays.stream(method.getParameterAnnotations()[0]).anyMatch(Nullable.class::isInstance);
-      return new TypeInformation(name, type, nullable);
-    }
-
-    public String getName() {
-      return name;
-    }
-
-    public TypeDescriptor getType() {
-      return type;
-    }
-
-    public boolean isNullable() {
-      return nullable;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      TypeInformation that = (TypeInformation) o;
-      return nullable == that.nullable
-          && Objects.equals(name, that.name)
-          && Objects.equals(type, that.type);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(name, type, nullable);
-    }
-  }
-
   /**
    * Infer a schema from a Java class.
    *
@@ -160,9 +67,9 @@ public int hashCode() {
    * public getter methods, or special annotations on the class.
    */
   public static Schema schemaFromClass(
-      Class<?> clazz, Function<Class, List<TypeInformation>> getTypesForClass) 
{
+      Class<?> clazz, Function<Class, List<FieldValueTypeInformation>> 
getTypesForClass) {
     Schema.Builder builder = Schema.builder();
-    for (TypeInformation type : getTypesForClass.apply(clazz)) {
+    for (FieldValueTypeInformation type : getTypesForClass.apply(clazz)) {
       Schema.FieldType fieldType = fieldFromType(type.getType(), 
getTypesForClass);
       if (type.isNullable()) {
         builder.addNullableField(type.getName(), fieldType);
@@ -175,7 +82,7 @@ public static Schema schemaFromClass(
 
   // Map a Java field type to a Beam Schema FieldType.
   private static Schema.FieldType fieldFromType(
-      TypeDescriptor type, Function<Class, List<TypeInformation>> 
getTypesForClass) {
+      TypeDescriptor type, Function<Class, List<FieldValueTypeInformation>> 
getTypesForClass) {
     FieldType primitiveType = PRIMITIVE_TYPES.get(type.getRawType());
     if (primitiveType != null) {
       return primitiveType;
@@ -209,7 +116,8 @@ public static Schema schemaFromClass(
         FieldType keyType = fieldFromType(TypeDescriptor.of(params[0]), 
getTypesForClass);
         FieldType valueType = fieldFromType(TypeDescriptor.of(params[1]), 
getTypesForClass);
         checkArgument(
-            keyType.getTypeName().isPrimitiveType(), "Only primitive types can 
be map keys");
+            keyType.getTypeName().isPrimitiveType(),
+            "Only primitive types can be map keys. type: " + 
keyType.getTypeName());
         return FieldType.map(keyType, valueType);
       } else {
         throw new RuntimeException("Cannot infer schema from unparameterized 
map.");
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
index f7cc64d7b486..3c22f4b62cf1 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/AvroSchemaTest.java
@@ -22,12 +22,20 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.reflect.AvroIgnore;
+import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.AvroSchema;
 import org.apache.avro.util.Utf8;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.AvroUtils.FixedBytesField;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -36,6 +44,142 @@
 
 /** Tests for AVRO schema classes. */
 public class AvroSchemaTest {
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroSubPojo {
+    @AvroName("bool_non_nullable")
+    public boolean boolNonNullable;
+
+    @AvroName("int")
+    @org.apache.avro.reflect.Nullable
+    public Integer anInt;
+
+    public AvroSubPojo(boolean boolNonNullable, Integer anInt) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+    }
+
+    public AvroSubPojo() {}
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroSubPojo)) {
+        return false;
+      }
+      AvroSubPojo that = (AvroSubPojo) o;
+      return boolNonNullable == that.boolNonNullable && Objects.equals(anInt, 
that.anInt);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(boolNonNullable, anInt);
+    }
+
+    @Override
+    public String toString() {
+      return "AvroSubPojo{" + "boolNonNullable=" + boolNonNullable + ", 
anInt=" + anInt + '}';
+    }
+  }
+
+  /** A test POJO that corresponds to our AVRO schema. */
+  public static class AvroPojo {
+    public @AvroName("bool_non_nullable") boolean boolNonNullable;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("int") Integer anInt;
+
+    @org.apache.avro.reflect.Nullable
+    public @AvroName("long") Long aLong;
+
+    @AvroName("float")
+    @org.apache.avro.reflect.Nullable
+    public Float aFloat;
+
+    @AvroName("double")
+    @org.apache.avro.reflect.Nullable
+    public Double aDouble;
+
+    @org.apache.avro.reflect.Nullable public String string;
+    @org.apache.avro.reflect.Nullable public ByteBuffer bytes;
+
+    @AvroSchema("{\"type\": \"fixed\", \"size\": 4, \"name\": \"fixed4\"}")
+    @org.apache.avro.reflect.Nullable
+    public byte[] fixed;
+
+    @org.apache.avro.reflect.Nullable public AvroSubPojo row;
+    @org.apache.avro.reflect.Nullable public List<AvroSubPojo> array;
+    @org.apache.avro.reflect.Nullable public Map<String, AvroSubPojo> map;
+    @AvroIgnore String extraField;
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof AvroPojo)) {
+        return false;
+      }
+      AvroPojo avroPojo = (AvroPojo) o;
+      return boolNonNullable == avroPojo.boolNonNullable
+          && Objects.equals(anInt, avroPojo.anInt)
+          && Objects.equals(aLong, avroPojo.aLong)
+          && Objects.equals(aFloat, avroPojo.aFloat)
+          && Objects.equals(aDouble, avroPojo.aDouble)
+          && Objects.equals(string, avroPojo.string)
+          && Objects.equals(bytes, avroPojo.bytes)
+          && Arrays.equals(fixed, avroPojo.fixed)
+          && Objects.equals(row, avroPojo.row)
+          && Objects.equals(array, avroPojo.array)
+          && Objects.equals(map, avroPojo.map);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(
+          boolNonNullable,
+          anInt,
+          aLong,
+          aFloat,
+          aDouble,
+          string,
+          bytes,
+          Arrays.hashCode(fixed),
+          row,
+          array,
+          map);
+    }
+
+    public AvroPojo(
+        boolean boolNonNullable,
+        int anInt,
+        long aLong,
+        float aFloat,
+        double aDouble,
+        String string,
+        ByteBuffer bytes,
+        byte[] fixed,
+        AvroSubPojo row,
+        List<AvroSubPojo> array,
+        Map<String, AvroSubPojo> map) {
+      this.boolNonNullable = boolNonNullable;
+      this.anInt = anInt;
+      this.aLong = aLong;
+      this.aFloat = aFloat;
+      this.aDouble = aDouble;
+      this.string = string;
+      this.bytes = bytes;
+      this.fixed = fixed;
+      this.row = row;
+      this.array = array;
+      this.map = map;
+      this.extraField = "";
+    }
+
+    public AvroPojo() {}
+  }
+
   private static final Schema SUBSCHEMA =
       Schema.builder()
           .addField("bool_non_nullable", FieldType.BOOLEAN)
@@ -52,18 +196,27 @@
           .addNullableField("double", FieldType.DOUBLE)
           .addNullableField("string", FieldType.STRING)
           .addNullableField("bytes", FieldType.BYTES)
-          .addField("fixed", FieldType.BYTES.withMetadata("FIXED:4"))
+          .addField("fixed", FixedBytesField.withSize(4).toBeamType())
           .addNullableField("timestampMillis", FieldType.DATETIME)
           .addNullableField("row", SUB_TYPE)
           .addNullableField("array", FieldType.array(SUB_TYPE))
           .addNullableField("map", FieldType.map(FieldType.STRING, SUB_TYPE))
           .build();
 
-  @Test
-  public void testSpecificRecordSchema() {
-    assertEquals(
-        SCHEMA, new 
AvroSpecificRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class)));
-  }
+  private static final Schema POJO_SCHEMA =
+      Schema.builder()
+          .addField("bool_non_nullable", FieldType.BOOLEAN)
+          .addNullableField("int", FieldType.INT32)
+          .addNullableField("long", FieldType.INT64)
+          .addNullableField("float", FieldType.FLOAT)
+          .addNullableField("double", FieldType.DOUBLE)
+          .addNullableField("string", FieldType.STRING)
+          .addNullableField("bytes", FieldType.BYTES)
+          .addField("fixed", FixedBytesField.withSize(4).toBeamType())
+          .addNullableField("row", SUB_TYPE)
+          .addNullableField("array", 
FieldType.array(SUB_TYPE.withNullable(false)))
+          .addNullableField("map", FieldType.map(FieldType.STRING, 
SUB_TYPE.withNullable(false)))
+          .build();
 
   private static final byte[] BYTE_ARRAY = new byte[] {1, 2, 3, 4};
   private static final DateTime DATE_TIME =
@@ -130,18 +283,27 @@ public void testSpecificRecordSchema() {
               ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW))
           .build();
 
+  @Test
+  public void testSpecificRecordSchema() {
+    assertEquals(SCHEMA, new 
AvroRecordSchema().schemaFor(TypeDescriptor.of(TestAvro.class)));
+  }
+
+  @Test
+  public void testPojoSchema() {
+    assertEquals(POJO_SCHEMA, AvroUtils.getSchema(AvroPojo.class));
+  }
+
   @Test
   public void testSpecificRecordToRow() {
     SerializableFunction<TestAvro, Row> toRow =
-        new 
AvroSpecificRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class));
-    Row row = toRow.apply(AVRO_SPECIFIC_RECORD);
+        new 
AvroRecordSchema().toRowFunction(TypeDescriptor.of(TestAvro.class));
     assertEquals(ROW, toRow.apply(AVRO_SPECIFIC_RECORD));
   }
 
   @Test
   public void testRowToSpecificRecord() {
     SerializableFunction<Row, TestAvro> fromRow =
-        new 
AvroSpecificRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class));
+        new 
AvroRecordSchema().fromRowFunction(TypeDescriptor.of(TestAvro.class));
     assertEquals(AVRO_SPECIFIC_RECORD, fromRow.apply(ROW));
   }
 
@@ -156,7 +318,51 @@ public void testGenericRecordToRow() {
   public void testRowToGenericRecord() {
     SerializableFunction<Row, GenericRecord> fromRow =
         AvroUtils.getRowToGenericRecordFunction(TestAvro.SCHEMA$);
-    GenericRecord generic = fromRow.apply(ROW);
     assertEquals(AVRO_GENERIC_RECORD, fromRow.apply(ROW));
   }
+
+  private static final AvroSubPojo SUB_POJO = new AvroSubPojo(true, 42);
+  private static final AvroPojo AVRO_POJO =
+      new AvroPojo(
+          true,
+          43,
+          44L,
+          (float) 44.1,
+          (double) 44.2,
+          "mystring",
+          ByteBuffer.wrap(BYTE_ARRAY),
+          BYTE_ARRAY,
+          SUB_POJO,
+          ImmutableList.of(SUB_POJO, SUB_POJO),
+          ImmutableMap.of("k1", SUB_POJO, "k2", SUB_POJO));
+
+  private static final Row ROW_FOR_POJO =
+      Row.withSchema(POJO_SCHEMA)
+          .addValues(
+              true,
+              43,
+              44L,
+              (float) 44.1,
+              (double) 44.2,
+              "mystring",
+              ByteBuffer.wrap(BYTE_ARRAY),
+              ByteBuffer.wrap(BYTE_ARRAY),
+              NESTED_ROW,
+              ImmutableList.of(NESTED_ROW, NESTED_ROW),
+              ImmutableMap.of("k1", NESTED_ROW, "k2", NESTED_ROW))
+          .build();
+
+  @Test
+  public void testPojoRecordToRow() {
+    SerializableFunction<AvroPojo, Row> toRow =
+        new 
AvroRecordSchema().toRowFunction(TypeDescriptor.of(AvroPojo.class));
+    assertEquals(ROW_FOR_POJO, toRow.apply(AVRO_POJO));
+  }
+
+  @Test
+  public void testRowToPojo() {
+    SerializableFunction<Row, AvroPojo> fromRow =
+        new 
AvroRecordSchema().fromRowFunction(TypeDescriptor.of(AvroPojo.class));
+    assertEquals(AVRO_POJO, fromRow.apply(ROW_FOR_POJO));
+  }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
index 938089911c7b..c835ea655764 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/JavaBeanUtilsTest.java
@@ -37,6 +37,8 @@
 import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.JavaBeanSchema;
+import org.apache.beam.sdk.schemas.JavaBeanSchema.SetterTypeSupplier;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithBoxedFields;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.BeanWithByteArray;
@@ -49,7 +51,6 @@
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveArrayBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.PrimitiveMapBean;
 import org.apache.beam.sdk.schemas.utils.TestJavaBeans.SimpleBean;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
 import org.joda.time.DateTime;
 import org.junit.Rule;
 import org.junit.Test;
@@ -61,8 +62,7 @@
 
   @Test
   public void testNullable() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class, 
SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NullableBean.class);
     assertTrue(schema.getField("str").getType().getNullable());
     assertFalse(schema.getField("anInt").getType().getNullable());
   }
@@ -70,62 +70,48 @@ public void testNullable() {
   @Test
   public void testMismatchingNullable() {
     thrown.expect(RuntimeException.class);
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            MismatchingNullableBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(MismatchingNullableBean.class);
   }
 
   @Test
   public void testSimpleBean() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class, 
SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(SimpleBean.class);
     SchemaTestUtils.assertSchemaEquivalent(SIMPLE_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedBean() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class, 
SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testPrimitiveArray() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            PrimitiveArrayBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveArrayBean.class);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_ARRAY_BEAN_SCHEMA, 
schema);
   }
 
   @Test
   public void testNestedArray() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            NestedArrayBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(NestedArrayBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_ARRAY_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedCollection() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            NestedCollectionBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(NestedCollectionBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_COLLECTION_BEAN_SCHEMA, 
schema);
   }
 
   @Test
   public void testPrimitiveMap() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            PrimitiveMapBean.class, SerializableFunctions.identity());
+    Schema schema = 
JavaBeanUtils.schemaFromJavaBeanClass(PrimitiveMapBean.class);
     SchemaTestUtils.assertSchemaEquivalent(PRIMITIVE_MAP_BEAN_SCHEMA, schema);
   }
 
   @Test
   public void testNestedMap() {
-    Schema schema =
-        JavaBeanUtils.schemaFromJavaBeanClass(
-            NestedMapBean.class, SerializableFunctions.identity());
+    Schema schema = JavaBeanUtils.schemaFromJavaBeanClass(NestedMapBean.class);
     SchemaTestUtils.assertSchemaEquivalent(NESTED_MAP_BEAN_SCHEMA, schema);
   }
 
@@ -147,7 +133,7 @@ public void testGeneratedSimpleGetters() {
 
     List<FieldValueGetter> getters =
         JavaBeanUtils.getGetters(
-            SimpleBean.class, SIMPLE_BEAN_SCHEMA, 
SerializableFunctions.identity());
+            SimpleBean.class, SIMPLE_BEAN_SCHEMA, new 
JavaBeanSchema.GetterTypeSupplier());
     assertEquals(12, getters.size());
     assertEquals("str", getters.get(0).name());
 
@@ -174,7 +160,8 @@ public void testGeneratedSimpleGetters() {
   @Test
   public void testGeneratedSimpleSetters() {
     SimpleBean simpleBean = new SimpleBean();
-    List<FieldValueSetter> setters = 
JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA);
+    List<FieldValueSetter> setters =
+        JavaBeanUtils.getSetters(SimpleBean.class, SIMPLE_BEAN_SCHEMA, new 
SetterTypeSupplier());
     assertEquals(12, setters.size());
 
     setters.get(0).set(simpleBean, "field1");
@@ -219,7 +206,7 @@ public void testGeneratedSimpleBoxedGetters() {
         JavaBeanUtils.getGetters(
             BeanWithBoxedFields.class,
             BEAN_WITH_BOXED_FIELDS_SCHEMA,
-            SerializableFunctions.identity());
+            new JavaBeanSchema.GetterTypeSupplier());
     assertEquals((byte) 41, getters.get(0).get(bean));
     assertEquals((short) 42, getters.get(1).get(bean));
     assertEquals((int) 43, getters.get(2).get(bean));
@@ -231,7 +218,8 @@ public void testGeneratedSimpleBoxedGetters() {
   public void testGeneratedSimpleBoxedSetters() {
     BeanWithBoxedFields bean = new BeanWithBoxedFields();
     List<FieldValueSetter> setters =
-        JavaBeanUtils.getSetters(BeanWithBoxedFields.class, 
BEAN_WITH_BOXED_FIELDS_SCHEMA);
+        JavaBeanUtils.getSetters(
+            BeanWithBoxedFields.class, BEAN_WITH_BOXED_FIELDS_SCHEMA, new 
SetterTypeSupplier());
 
     setters.get(0).set(bean, (byte) 41);
     setters.get(1).set(bean, (short) 42);
@@ -250,7 +238,8 @@ public void testGeneratedSimpleBoxedSetters() {
   public void testGeneratedByteBufferSetters() {
     BeanWithByteArray bean = new BeanWithByteArray();
     List<FieldValueSetter> setters =
-        JavaBeanUtils.getSetters(BeanWithByteArray.class, 
BEAN_WITH_BYTE_ARRAY_SCHEMA);
+        JavaBeanUtils.getSetters(
+            BeanWithByteArray.class, BEAN_WITH_BYTE_ARRAY_SCHEMA, new 
SetterTypeSupplier());
     setters.get(0).set(bean, "field1".getBytes(Charset.defaultCharset()));
     setters.get(1).set(bean, "field2".getBytes(Charset.defaultCharset()));
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
index 1a0946d6f609..e140a8bc1218 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/POJOUtilsTest.java
@@ -37,6 +37,7 @@
 import java.util.List;
 import org.apache.beam.sdk.schemas.FieldValueGetter;
 import org.apache.beam.sdk.schemas.FieldValueSetter;
+import org.apache.beam.sdk.schemas.JavaFieldSchema.JavaFieldTypeSupplier;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedArrayPOJO;
 import org.apache.beam.sdk.schemas.utils.TestPOJOs.NestedCollectionPOJO;
@@ -126,7 +127,8 @@ public void testGeneratedSimpleGetters() {
             new BigDecimal(42),
             new StringBuilder("stringBuilder"));
 
-    List<FieldValueGetter> getters = POJOUtils.getGetters(SimplePOJO.class, 
SIMPLE_POJO_SCHEMA);
+    List<FieldValueGetter> getters =
+        POJOUtils.getGetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new 
JavaFieldTypeSupplier());
     assertEquals(12, getters.size());
     assertEquals("str", getters.get(0).name());
     assertEquals("field1", getters.get(0).get(simplePojo));
@@ -147,7 +149,8 @@ public void testGeneratedSimpleGetters() {
   @Test
   public void testGeneratedSimpleSetters() {
     SimplePOJO simplePojo = new SimplePOJO();
-    List<FieldValueSetter> setters = POJOUtils.getSetters(SimplePOJO.class, 
SIMPLE_POJO_SCHEMA);
+    List<FieldValueSetter> setters =
+        POJOUtils.getSetters(SimplePOJO.class, SIMPLE_POJO_SCHEMA, new 
JavaFieldTypeSupplier());
     assertEquals(12, setters.size());
 
     setters.get(0).set(simplePojo, "field1");
@@ -182,7 +185,8 @@ public void testGeneratedSimpleBoxedGetters() {
     POJOWithBoxedFields pojo = new POJOWithBoxedFields((byte) 41, (short) 42, 
43, 44L, true);
 
     List<FieldValueGetter> getters =
-        POJOUtils.getGetters(POJOWithBoxedFields.class, 
POJO_WITH_BOXED_FIELDS_SCHEMA);
+        POJOUtils.getGetters(
+            POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new 
JavaFieldTypeSupplier());
     assertEquals((byte) 41, getters.get(0).get(pojo));
     assertEquals((short) 42, getters.get(1).get(pojo));
     assertEquals((int) 43, getters.get(2).get(pojo));
@@ -194,7 +198,8 @@ public void testGeneratedSimpleBoxedGetters() {
   public void testGeneratedSimpleBoxedSetters() {
     POJOWithBoxedFields pojo = new POJOWithBoxedFields();
     List<FieldValueSetter> setters =
-        POJOUtils.getSetters(POJOWithBoxedFields.class, 
POJO_WITH_BOXED_FIELDS_SCHEMA);
+        POJOUtils.getSetters(
+            POJOWithBoxedFields.class, POJO_WITH_BOXED_FIELDS_SCHEMA, new 
JavaFieldTypeSupplier());
 
     setters.get(0).set(pojo, (byte) 41);
     setters.get(1).set(pojo, (short) 42);
@@ -213,7 +218,8 @@ public void testGeneratedSimpleBoxedSetters() {
   public void testGeneratedByteBufferSetters() {
     POJOWithByteArray pojo = new POJOWithByteArray();
     List<FieldValueSetter> setters =
-        POJOUtils.getSetters(POJOWithByteArray.class, 
POJO_WITH_BYTE_ARRAY_SCHEMA);
+        POJOUtils.getSetters(
+            POJOWithByteArray.class, POJO_WITH_BYTE_ARRAY_SCHEMA, new 
JavaFieldTypeSupplier());
     setters.get(0).set(pojo, BYTE_ARRAY);
     setters.get(1).set(pojo, BYTE_BUFFER.array());
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 175470)
    Time Spent: 11h 20m  (was: 11h 10m)

> Provide automatic schema registration for AVROs
> -----------------------------------------------
>
>                 Key: BEAM-4454
>                 URL: https://issues.apache.org/jira/browse/BEAM-4454
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 11h 20m
>  Remaining Estimate: 0h
>
> Need to make sure this is a compatible change



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to