[ 
https://issues.apache.org/jira/browse/BEAM-4077?focusedWorklogId=106012&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106012
 ]

ASF GitHub Bot logged work on BEAM-4077:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/May/18 21:02
            Start Date: 25/May/18 21:02
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5488: [BEAM-4077] 
Schema fields are non-null by default
URL: https://github.com/apache/beam/pull/5488
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index a9d2f333d12..3a2fa6d8c65 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -73,65 +73,64 @@ public Builder addField(Field field) {
       return this;
     }
 
-    public Builder addByteField(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.BYTE.type()).withNullable(nullable));
+    public Builder addByteField(String name) {
+      fields.add(Field.of(name, FieldType.BYTE));
       return this;
     }
 
-    public Builder addInt16Field(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.INT16.type()).withNullable(nullable));
+    public Builder addInt16Field(String name) {
+      fields.add(Field.of(name, FieldType.INT16));
       return this;
     }
 
-    public Builder addInt32Field(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.INT32.type()).withNullable(nullable));
+    public Builder addInt32Field(String name) {
+      fields.add(Field.of(name, FieldType.INT32));
       return this;
     }
 
-    public Builder addInt64Field(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.INT64.type()).withNullable(nullable));
+    public Builder addInt64Field(String name) {
+      fields.add(Field.of(name, FieldType.INT64));
       return this;
     }
 
-    public Builder addDecimalField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.DECIMAL.type()).withNullable(nullable));
+    public Builder addDecimalField(String name) {
+      fields.add(Field.of(name, FieldType.DECIMAL));
       return this;
     }
 
-    public Builder addFloatField(String name, boolean nullable) {
-      fields.add(Field.of(name, TypeName.FLOAT.type()).withNullable(nullable));
+    public Builder addFloatField(String name) {
+      fields.add(Field.of(name, FieldType.FLOAT));
       return this;
     }
 
-    public Builder addDoubleField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.DOUBLE.type()).withNullable(nullable));
+    public Builder addDoubleField(String name) {
+      fields.add(Field.of(name, FieldType.DOUBLE));
       return this;
     }
 
-    public Builder addStringField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.STRING.type()).withNullable(nullable));
+    public Builder addStringField(String name) {
+      fields.add(Field.of(name, FieldType.STRING));
       return this;
     }
 
-    public Builder addDateTimeField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.DATETIME.type()).withNullable(nullable));
+    public Builder addDateTimeField(String name) {
+      fields.add(Field.of(name, FieldType.DATETIME));
       return this;
     }
 
-    public Builder addBooleanField(String name, boolean nullable) {
-      fields.add(Field.of(name, 
TypeName.BOOLEAN.type()).withNullable(nullable));
+    public Builder addBooleanField(String name) {
+      fields.add(Field.of(name, FieldType.BOOLEAN));
       return this;
     }
 
     public Builder addArrayField(String name, FieldType collectionElementType) 
{
       fields.add(
-          Field.of(name, 
TypeName.ARRAY.type().withCollectionElementType(collectionElementType)));
+          Field.of(name, FieldType.array(collectionElementType)));
       return this;
     }
 
-    public Builder addRowField(String name, Schema fieldSchema, boolean 
nullable) {
-      fields.add(Field.of(name, TypeName.ROW.type().withRowSchema(fieldSchema))
-          .withNullable(nullable));
+    public Builder addRowField(String name, Schema fieldSchema) {
+      fields.add(Field.of(name, FieldType.row(fieldSchema)));
       return this;
     }
 
@@ -186,8 +185,13 @@ public int hashCode() {
     return fields;
   }
 
-  /**
-   * An enumerated list of supported types.
+  /** An enumerated list of type constructors.
+   *
+   * <ul>
+   *   <li>Atomic types are built from type constructors that take no arguments
+   *   <li>Arrays, rows, and maps are type constructors that take additional
+   *       arguments to form a valid {@link FieldType}.
+   * </ul>
    */
   public enum TypeName {
     BYTE,    // One-byte signed integer.
@@ -204,8 +208,6 @@ public int hashCode() {
     MAP,
     ROW;    // The field is itself a nested row.
 
-    private final FieldType fieldType = FieldType.of(this);
-
     public static final Set<TypeName> NUMERIC_TYPES = ImmutableSet.of(
         BYTE, INT16, INT32, INT64, DECIMAL, FLOAT, DOUBLE);
     public static final Set<TypeName> STRING_TYPES = ImmutableSet.of(STRING);
@@ -235,11 +237,6 @@ public boolean isMapType() {
     public boolean isCompositeType() {
       return COMPOSITE_TYPES.contains(this);
     }
-
-    /** Returns a {@link FieldType} representing this primitive type. */
-    public FieldType type() {
-      return fieldType;
-    }
   }
 
   /**
@@ -251,20 +248,31 @@ public FieldType type() {
   public abstract static class FieldType implements Serializable {
     // Returns the type of this field.
     public abstract TypeName getTypeName();
+
     // For container types (e.g. ARRAY), returns the type of the contained 
element.
     @Nullable public abstract FieldType getCollectionElementType();
+
     // For MAP type, returns the type of the key element, it must be a 
primitive type;
     @Nullable public abstract FieldType getMapKeyType();
+
     // For MAP type, returns the type of the value element, it can be a nested 
type;
     @Nullable public abstract FieldType getMapValueType();
+
     // For ROW types, returns the schema for the row.
     @Nullable public abstract Schema getRowSchema();
+
     /**
      * Returns optional extra metadata.
      */
     @SuppressWarnings("mutable")
     @Nullable public abstract byte[] getMetadata();
+
     abstract FieldType.Builder toBuilder();
+
+    public static FieldType.Builder forTypeName(TypeName typeName) {
+      return new AutoValue_Schema_FieldType.Builder().setTypeName(typeName);
+    }
+
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setTypeName(TypeName typeName);
@@ -283,6 +291,55 @@ public static FieldType of(TypeName typeName) {
       return new 
AutoValue_Schema_FieldType.Builder().setTypeName(typeName).build();
     }
 
+    /** The type of string fields. */
+    public static final FieldType STRING = FieldType.of(TypeName.STRING);
+
+    /** The type of byte fields. */
+    public static final FieldType BYTE = FieldType.of(TypeName.BYTE);
+
+    /** The type of int16 fields. */
+    public static final FieldType INT16 = FieldType.of(TypeName.INT16);
+
+    /** The type of int32 fields. */
+    public static final FieldType INT32 = FieldType.of(TypeName.INT32);
+
+    /** The type of int64 fields. */
+    public static final FieldType INT64 = FieldType.of(TypeName.INT64);
+
+    /** The type of float fields. */
+    public static final FieldType FLOAT = FieldType.of(TypeName.FLOAT);
+
+    /** The type of double fields. */
+    public static final FieldType DOUBLE = FieldType.of(TypeName.DOUBLE);
+
+    /** The type of decimal fields. */
+    public static final FieldType DECIMAL = FieldType.of(TypeName.DECIMAL);
+
+    /** The type of boolean fields. */
+    public static final FieldType BOOLEAN = FieldType.of(TypeName.BOOLEAN);
+
+    /** The type of datetime fields. */
+    public static final FieldType DATETIME = FieldType.of(TypeName.DATETIME);
+
+    /** Create an array type for the given field type. */
+    public static final FieldType array(FieldType elementType) {
+      return 
FieldType.forTypeName(TypeName.ARRAY).setCollectionElementType(elementType).build();
+    }
+
+    /** Create a map type for the given key and value types. */
+    public static final FieldType map(FieldType keyType, FieldType valueType) {
+      return FieldType
+          .forTypeName(TypeName.MAP)
+          .setMapKeyType(keyType)
+          .setMapValueType(valueType)
+          .build();
+    }
+
+    /** Create a map type for the given key and value types. */
+    public static final FieldType row(Schema schema) {
+      return FieldType.forTypeName(TypeName.ROW).setRowSchema(schema).build();
+    }
+
     /**
      * For container types, adds the type of the component element.
      */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
index b17f435bda4..4c67f4311c9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonDeserializer.java
@@ -108,7 +108,7 @@ public Row deserialize(JsonParser jsonParser, 
DeserializationContext deserializa
         (Row) extractJsonNodeValue(
             FieldValue.of(
                 "root",
-                TypeName.ROW.type().withRowSchema(schema),
+                FieldType.row(schema),
                 jsonParser
                     .readValueAsTree()));
   }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
index cd1d96aebc4..d20bca7c4a1 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/org/apache/beam/sdk/coders/RowCoderTest.java
@@ -29,7 +29,6 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -50,16 +49,16 @@ void checkEncodeDecode(Row row) throws IOException {
   @Test
   public void testPrimitiveTypes() throws Exception {
     Schema schema = Schema.builder()
-        .addByteField("f_byte", false)
-        .addInt16Field("f_int16", false)
-        .addInt32Field("f_int32", false)
-        .addInt64Field("f_int64", false)
-        .addDecimalField("f_decimal", false)
-        .addFloatField("f_float", false)
-        .addDoubleField("f_double", false)
-        .addStringField("f_string", false)
-        .addDateTimeField("f_datetime", false)
-        .addBooleanField("f_boolean", false).build();
+        .addByteField("f_byte")
+        .addInt16Field("f_int16")
+        .addInt32Field("f_int32")
+        .addInt64Field("f_int64")
+        .addDecimalField("f_decimal")
+        .addFloatField("f_float")
+        .addDoubleField("f_double")
+        .addStringField("f_string")
+        .addDateTimeField("f_datetime")
+        .addBooleanField("f_boolean").build();
 
     DateTime dateTime = new DateTime().withDate(1979, 03, 14)
         .withTime(1, 2, 3, 4)
@@ -76,11 +75,11 @@ public void testPrimitiveTypes() throws Exception {
   @Test
   public void testNestedTypes() throws Exception {
     Schema nestedSchema = Schema.builder()
-        .addInt32Field("f1_int", false)
-        .addStringField("f1_str", false).build();
+        .addInt32Field("f1_int")
+        .addStringField("f1_str").build();
     Schema schema = Schema.builder()
-        .addInt32Field("f_int", false)
-        .addRowField("nested", nestedSchema, false).build();
+        .addInt32Field("f_int")
+        .addRowField("nested", nestedSchema).build();
 
     Row nestedRow = Row.withSchema(nestedSchema).addValues(18, 
"foobar").build();
     Row row = Row.withSchema(schema).addValues(42, nestedRow).build();
@@ -90,7 +89,7 @@ public void testNestedTypes() throws Exception {
   @Test
   public void testArrays() throws Exception {
     Schema schema = Schema.builder()
-        .addArrayField("f_array", TypeName.STRING.type())
+        .addArrayField("f_array", FieldType.STRING)
         .build();
     Row row = Row.withSchema(schema).addArray("one", "two", "three", 
"four").build();
     checkEncodeDecode(row);
@@ -99,9 +98,9 @@ public void testArrays() throws Exception {
   @Test
   public void testArrayOfRow() throws Exception {
     Schema nestedSchema = Schema.builder()
-        .addInt32Field("f1_int", false)
-        .addStringField("f1_str", false).build();
-    FieldType collectionElementType = 
TypeName.ROW.type().withRowSchema(nestedSchema);
+        .addInt32Field("f1_int")
+        .addStringField("f1_str").build();
+    FieldType collectionElementType = FieldType.row(nestedSchema);
     Schema schema = Schema.builder().addArrayField("f_array", 
collectionElementType).build();
     Row row = Row.withSchema(schema).addArray(
         Row.withSchema(nestedSchema).addValues(1, "one").build(),
@@ -113,9 +112,7 @@ public void testArrayOfRow() throws Exception {
 
   @Test
   public void testArrayOfArray() throws Exception {
-    FieldType arrayType = TypeName.ARRAY.type()
-        .withCollectionElementType(TypeName.ARRAY.type()
-            .withCollectionElementType(TypeName.INT32.type()));
+    FieldType arrayType = FieldType.array(FieldType.array(FieldType.INT32));
     Schema schema = Schema.builder().addField(Field.of("f_array", 
arrayType)).build();
     Row row = Row.withSchema(schema).addArray(
         Lists.newArrayList(1, 2, 3, 4),
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
index f33edc65d20..22fbbc3174d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/SchemaTest.java
@@ -24,7 +24,6 @@
 import java.util.stream.Stream;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -39,76 +38,73 @@
   @Test
   public void testCreate() {
     Schema schema = Schema.builder()
-        .addByteField("f_byte", false)
-        .addInt16Field("f_int16", false)
-        .addInt32Field("f_int32", false)
-        .addInt64Field("f_int64", false)
-        .addDecimalField("f_decimal", false)
-        .addFloatField("f_float", false)
-        .addDoubleField("f_double", false)
-        .addStringField("f_string", false)
-        .addDateTimeField("f_datetime", false)
-        .addBooleanField("f_boolean", false).build();
+        .addByteField("f_byte")
+        .addInt16Field("f_int16")
+        .addInt32Field("f_int32")
+        .addInt64Field("f_int64")
+        .addDecimalField("f_decimal")
+        .addFloatField("f_float")
+        .addDoubleField("f_double")
+        .addStringField("f_string")
+        .addDateTimeField("f_datetime")
+        .addBooleanField("f_boolean").build();
     assertEquals(10, schema.getFieldCount());
 
     assertEquals(0, schema.indexOf("f_byte"));
     assertEquals("f_byte", schema.getField(0).getName());
-    assertEquals(TypeName.BYTE.type(), schema.getField(0).getType());
+    assertEquals(FieldType.BYTE, schema.getField(0).getType());
 
     assertEquals(1, schema.indexOf("f_int16"));
     assertEquals("f_int16", schema.getField(1).getName());
-    assertEquals(TypeName.INT16.type(), schema.getField(1).getType());
+    assertEquals(FieldType.INT16, schema.getField(1).getType());
 
     assertEquals(2, schema.indexOf("f_int32"));
     assertEquals("f_int32", schema.getField(2).getName());
-    assertEquals(TypeName.INT32.type(), schema.getField(2).getType());
+    assertEquals(FieldType.INT32, schema.getField(2).getType());
 
     assertEquals(3, schema.indexOf("f_int64"));
     assertEquals("f_int64", schema.getField(3).getName());
-    assertEquals(TypeName.INT64.type(), schema.getField(3).getType());
+    assertEquals(FieldType.INT64, schema.getField(3).getType());
 
     assertEquals(4, schema.indexOf("f_decimal"));
     assertEquals("f_decimal", schema.getField(4).getName());
-    assertEquals(TypeName.DECIMAL.type(),
+    assertEquals(FieldType.DECIMAL,
         schema.getField(4).getType());
 
     assertEquals(5, schema.indexOf("f_float"));
     assertEquals("f_float", schema.getField(5).getName());
-    assertEquals(TypeName.FLOAT.type(), schema.getField(5).getType());
+    assertEquals(FieldType.FLOAT, schema.getField(5).getType());
 
     assertEquals(6, schema.indexOf("f_double"));
     assertEquals("f_double", schema.getField(6).getName());
-    assertEquals(TypeName.DOUBLE.type(), schema.getField(6).getType());
+    assertEquals(FieldType.DOUBLE, schema.getField(6).getType());
 
     assertEquals(7, schema.indexOf("f_string"));
     assertEquals("f_string", schema.getField(7).getName());
-    assertEquals(TypeName.STRING.type(), schema.getField(7).getType());
+    assertEquals(FieldType.STRING, schema.getField(7).getType());
 
     assertEquals(8, schema.indexOf("f_datetime"));
     assertEquals("f_datetime", schema.getField(8).getName());
-    assertEquals(TypeName.DATETIME.type(),
+    assertEquals(FieldType.DATETIME,
         schema.getField(8).getType());
 
     assertEquals(9, schema.indexOf("f_boolean"));
     assertEquals("f_boolean", schema.getField(9).getName());
-    assertEquals(TypeName.BOOLEAN.type(), schema.getField(9).getType());
+    assertEquals(FieldType.BOOLEAN, schema.getField(9).getType());
   }
 
   @Test
   public void testNestedSchema() {
-    Schema nestedSchema = Schema.of(
-        Field.of("f1_str", TypeName.STRING.type()));
-    Schema schema = Schema.of(
-        Field.of("nested", TypeName.ROW.type().withRowSchema(nestedSchema)));
+    Schema nestedSchema = Schema.of(Field.of("f1_str", FieldType.STRING));
+    Schema schema = Schema.of(Field.of("nested", FieldType.row(nestedSchema)));
     Field inner = 
schema.getField("nested").getType().getRowSchema().getField("f1_str");
     assertEquals("f1_str", inner.getName());
-    assertEquals(TypeName.STRING, inner.getType().getTypeName());
+    assertEquals(FieldType.STRING, inner.getType());
   }
 
   @Test
   public void testArraySchema() {
-    FieldType arrayType = TypeName.ARRAY.type()
-        .withCollectionElementType(TypeName.STRING.type());
+    FieldType arrayType = FieldType.array(FieldType.STRING);
     Schema schema = Schema.of(Field.of("f_array", arrayType));
     Field field = schema.getField("f_array");
     assertEquals("f_array", field.getName());
@@ -118,10 +114,8 @@ public void testArraySchema() {
   @Test
   public void testArrayOfRowSchema() {
     Schema nestedSchema = Schema.of(
-        Field.of("f1_str", TypeName.STRING.type()));
-    FieldType arrayType = TypeName.ARRAY.type()
-        .withCollectionElementType(TypeName.ROW.type()
-            .withRowSchema(nestedSchema));
+        Field.of("f1_str", FieldType.STRING));
+    FieldType arrayType = FieldType.array(FieldType.row(nestedSchema));
     Schema schema = Schema.of(Field.of("f_array", arrayType));
     Field field = schema.getField("f_array");
     assertEquals("f_array", field.getName());
@@ -130,9 +124,7 @@ public void testArrayOfRowSchema() {
 
   @Test
   public void testNestedArraySchema() {
-    FieldType arrayType = TypeName.ARRAY.type()
-        .withCollectionElementType(TypeName.ARRAY.type()
-            .withCollectionElementType(TypeName.STRING.type()));
+    FieldType arrayType = FieldType.array(FieldType.array(FieldType.STRING));
     Schema schema = Schema.of(Field.of("f_array", arrayType));
     Field field = schema.getField("f_array");
     assertEquals("f_array", field.getName());
@@ -141,7 +133,7 @@ public void testNestedArraySchema() {
 
   @Test
   public void testWrongName() {
-    Schema schema = Schema.of(Field.of("f_byte", TypeName.BYTE.type()));
+    Schema schema = Schema.of(Field.of("f_byte", FieldType.BYTE));
     thrown.expect(IllegalArgumentException.class);
     schema.getField("f_string");
   }
@@ -149,7 +141,7 @@ public void testWrongName() {
   @Test
   public void testWrongIndex() {
     Schema schema = Schema.of(
-        Field.of("f_byte", TypeName.BYTE.type()));
+        Field.of("f_byte", FieldType.BYTE));
     thrown.expect(IndexOutOfBoundsException.class);
     schema.getField(1);
   }
@@ -161,15 +153,15 @@ public void testCollector() {
     Schema schema =
         Stream
             .of(
-                Schema.Field.of("f_int", TypeName.INT32.type()),
-                Schema.Field.of("f_string", TypeName.STRING.type()))
+                Schema.Field.of("f_int", FieldType.INT32),
+                Schema.Field.of("f_string", FieldType.STRING))
             .collect(toSchema());
 
     assertEquals(2, schema.getFieldCount());
 
     assertEquals("f_int", schema.getField(0).getName());
-    assertEquals(TypeName.INT32, schema.getField(0).getType().getTypeName());
+    assertEquals(FieldType.INT32, schema.getField(0).getType());
     assertEquals("f_string", schema.getField(1).getName());
-    assertEquals(TypeName.STRING, schema.getField(1).getType().getTypeName());
+    assertEquals(FieldType.STRING, schema.getField(1).getType());
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
index 021bf287426..fa4ad756804 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/JsonToRowTest.java
@@ -36,8 +36,6 @@
 @RunWith(JUnit4.class)
 public class JsonToRowTest implements Serializable {
 
-  private static final boolean NOT_NULLABLE = false;
-
   @Rule
   public transient TestPipeline pipeline = TestPipeline.create();
 
@@ -47,9 +45,9 @@ public void testParsesRows() throws Exception {
     Schema personSchema =
         Schema
             .builder()
-            .addStringField("name", NOT_NULLABLE)
-            .addInt32Field("height", NOT_NULLABLE)
-            .addBooleanField("knowsJavascript", NOT_NULLABLE)
+            .addStringField("name")
+            .addInt32Field("height")
+            .addBooleanField("knowsJavascript")
             .build();
 
     PCollection<String> jsonPersons = pipeline
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
index a93b3f81e32..ee3b238f6e7 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowJsonDeserializerTest.java
@@ -17,16 +17,6 @@
  */
 package org.apache.beam.sdk.util;
 
-import static org.apache.beam.sdk.schemas.Schema.TypeName.ARRAY;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.BOOLEAN;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.BYTE;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.DATETIME;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.DOUBLE;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.FLOAT;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.INT16;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.INT32;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.INT64;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.STRING;
 import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.stringContainsInOrder;
@@ -37,7 +27,6 @@
 import java.util.Arrays;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
 import 
org.apache.beam.sdk.util.RowJsonDeserializer.UnsupportedRowJsonException;
 import org.apache.beam.sdk.values.Row;
 import org.hamcrest.Matcher;
@@ -76,14 +65,14 @@ public void testParsesFlatRow() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addByteField("f_byte", NOT_NULLABLE)
-            .addInt16Field("f_int16", NOT_NULLABLE)
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addInt64Field("f_int64", NOT_NULLABLE)
-            .addFloatField("f_float", NOT_NULLABLE)
-            .addDoubleField("f_double", NOT_NULLABLE)
-            .addBooleanField("f_boolean", NOT_NULLABLE)
-            .addStringField("f_string", NOT_NULLABLE)
+            .addByteField("f_byte")
+            .addInt16Field("f_int16")
+            .addInt32Field("f_int32")
+            .addInt64Field("f_int64")
+            .addFloatField("f_float")
+            .addDoubleField("f_double")
+            .addBooleanField("f_boolean")
+            .addStringField("f_string")
             .build();
 
     String rowString = "{\n"
@@ -115,8 +104,8 @@ public void testParsesArrayField() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addArrayField("f_intArray", INT32.type())
+            .addInt32Field("f_int32")
+            .addArrayField("f_intArray", FieldType.INT32)
             .build();
 
     String rowString = "{\n"
@@ -143,8 +132,7 @@ public void testParsesArrayOfArrays() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addArrayField("f_arrayOfIntArrays",
-                           
FieldType.of(ARRAY).withCollectionElementType(INT32.type()))
+            .addArrayField("f_arrayOfIntArrays", 
FieldType.array(FieldType.INT32))
             .build();
 
     String rowString = "{\n"
@@ -173,8 +161,7 @@ public void testThrowsForMismatchedArrayField() throws 
Exception {
     Schema schema =
         Schema
             .builder()
-            .addArrayField("f_arrayOfIntArrays",
-                           
FieldType.of(ARRAY).withCollectionElementType(INT32.type()))
+            .addArrayField("f_arrayOfIntArrays", 
FieldType.array(FieldType.INT32))
             .build();
 
     String rowString = "{\n"
@@ -194,15 +181,15 @@ public void testParsesRowField() throws Exception {
     Schema nestedRowSchema =
         Schema
             .builder()
-            .addInt32Field("f_nestedInt32", NOT_NULLABLE)
-            .addStringField("f_nestedString", NOT_NULLABLE)
+            .addInt32Field("f_nestedInt32")
+            .addStringField("f_nestedString")
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addRowField("f_row", nestedRowSchema, NOT_NULLABLE)
+            .addInt32Field("f_int32")
+            .addRowField("f_row", nestedRowSchema)
             .build();
 
     String rowString = "{\n"
@@ -231,15 +218,15 @@ public void testThrowsForMismatchedRowField() throws 
Exception {
     Schema nestedRowSchema =
         Schema
             .builder()
-            .addInt32Field("f_nestedInt32", NOT_NULLABLE)
-            .addStringField("f_nestedString", NOT_NULLABLE)
+            .addInt32Field("f_nestedInt32")
+            .addStringField("f_nestedString")
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addInt32Field("f_int32", NOT_NULLABLE)
-            .addRowField("f_row", nestedRowSchema, NOT_NULLABLE)
+            .addInt32Field("f_int32")
+            .addRowField("f_row", nestedRowSchema)
             .build();
 
     String rowString = "{\n"
@@ -261,19 +248,19 @@ public void testParsesNestedRowField() throws Exception {
     Schema doubleNestedRowSchema =
         Schema
             .builder()
-            .addStringField("f_doubleNestedString", NOT_NULLABLE)
+            .addStringField("f_doubleNestedString")
             .build();
 
     Schema nestedRowSchema =
         Schema
             .builder()
-            .addRowField("f_nestedRow", doubleNestedRowSchema, NOT_NULLABLE)
+            .addRowField("f_nestedRow", doubleNestedRowSchema)
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addRowField("f_row", nestedRowSchema, NOT_NULLABLE)
+            .addRowField("f_row", nestedRowSchema)
             .build();
 
     String rowString = "{\n"
@@ -310,7 +297,7 @@ public void testThrowsForUnsupportedType() throws Exception 
{
     Schema schema =
         Schema
             .builder()
-            .addDateTimeField("f_dateTime", NOT_NULLABLE)
+            .addDateTimeField("f_dateTime")
             .build();
 
     thrown.expect(UnsupportedRowJsonException.class);
@@ -324,7 +311,7 @@ public void testThrowsForUnsupportedArrayElementType() 
throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addArrayField("f_dateTimeArray", DATETIME.type())
+            .addArrayField("f_dateTimeArray", FieldType.DATETIME)
             .build();
 
     thrown.expect(UnsupportedRowJsonException.class);
@@ -338,13 +325,13 @@ public void testThrowsForUnsupportedNestedFieldType() 
throws Exception {
     Schema nestedSchema =
         Schema
             .builder()
-            .addArrayField("f_dateTimeArray", DATETIME.type())
+            .addArrayField("f_dateTimeArray", FieldType.DATETIME)
             .build();
 
     Schema schema =
         Schema
             .builder()
-            .addRowField("f_nestedRow", nestedSchema, NOT_NULLABLE)
+            .addRowField("f_nestedRow", nestedSchema)
             .build();
 
     thrown.expect(UnsupportedRowJsonException.class);
@@ -358,8 +345,8 @@ public void testParsesNulls() throws Exception {
     Schema schema =
         Schema
             .builder()
-            .addByteField("f_byte", NOT_NULLABLE)
-            .addStringField("f_string", NULLABLE)
+            .addByteField("f_byte")
+            .addField(Schema.Field.of("f_string", 
FieldType.STRING).withNullable(true))
             .build();
 
     String rowString = "{\n"
@@ -385,8 +372,8 @@ public void testThrowsForMissingNotNullableField() throws 
Exception {
     Schema schema =
         Schema
             .builder()
-            .addByteField("f_byte", NOT_NULLABLE)
-            .addStringField("f_string", NOT_NULLABLE)
+            .addByteField("f_byte")
+            .addStringField("f_string")
             .build();
 
     String rowString = "{\n"
@@ -403,59 +390,59 @@ public void testThrowsForMissingNotNullableField() throws 
Exception {
 
   @Test
   public void testSupportedBooleanConversions() throws Exception {
-    testSupportedConversion(BOOLEAN, BOOLEAN_TRUE_STRING, BOOLEAN_TRUE_VALUE);
+    testSupportedConversion(FieldType.BOOLEAN, BOOLEAN_TRUE_STRING, 
BOOLEAN_TRUE_VALUE);
   }
 
   @Test
   public void testSupportedStringConversions() throws Exception {
-    testSupportedConversion(STRING, quoted(FLOAT_STRING), FLOAT_STRING);
+    testSupportedConversion(FieldType.STRING, quoted(FLOAT_STRING), 
FLOAT_STRING);
   }
 
   @Test
   public void testSupportedByteConversions() throws Exception {
-    testSupportedConversion(BYTE, BYTE_STRING, BYTE_VALUE);
+    testSupportedConversion(FieldType.BYTE, BYTE_STRING, BYTE_VALUE);
   }
 
   @Test
   public void testSupportedShortConversions() throws Exception {
-    testSupportedConversion(INT16, BYTE_STRING, (short) BYTE_VALUE);
-    testSupportedConversion(INT16, SHORT_STRING, SHORT_VALUE);
+    testSupportedConversion(FieldType.INT16, BYTE_STRING, (short) BYTE_VALUE);
+    testSupportedConversion(FieldType.INT16, SHORT_STRING, SHORT_VALUE);
   }
 
   @Test
   public void testSupportedIntConversions() throws Exception {
-    testSupportedConversion(INT32, BYTE_STRING, (int) BYTE_VALUE);
-    testSupportedConversion(INT32, SHORT_STRING, (int) SHORT_VALUE);
-    testSupportedConversion(INT32, INT_STRING, INT_VALUE);
+    testSupportedConversion(FieldType.INT32, BYTE_STRING, (int) BYTE_VALUE);
+    testSupportedConversion(FieldType.INT32, SHORT_STRING, (int) SHORT_VALUE);
+    testSupportedConversion(FieldType.INT32, INT_STRING, INT_VALUE);
   }
 
   @Test
   public void testSupportedLongConversions() throws Exception {
-    testSupportedConversion(INT64, BYTE_STRING, (long) BYTE_VALUE);
-    testSupportedConversion(INT64, SHORT_STRING, (long) SHORT_VALUE);
-    testSupportedConversion(INT64, INT_STRING, (long) INT_VALUE);
-    testSupportedConversion(INT64, LONG_STRING, LONG_VALUE);
+    testSupportedConversion(FieldType.INT64, BYTE_STRING, (long) BYTE_VALUE);
+    testSupportedConversion(FieldType.INT64, SHORT_STRING, (long) SHORT_VALUE);
+    testSupportedConversion(FieldType.INT64, INT_STRING, (long) INT_VALUE);
+    testSupportedConversion(FieldType.INT64, LONG_STRING, LONG_VALUE);
   }
 
   @Test
   public void testSupportedFloatConversions() throws Exception {
-    testSupportedConversion(FLOAT, FLOAT_STRING, FLOAT_VALUE);
-    testSupportedConversion(FLOAT, SHORT_STRING, (float) SHORT_VALUE);
+    testSupportedConversion(FieldType.FLOAT, FLOAT_STRING, FLOAT_VALUE);
+    testSupportedConversion(FieldType.FLOAT, SHORT_STRING, (float) 
SHORT_VALUE);
   }
 
   @Test
   public void testSupportedDoubleConversions() throws Exception {
-    testSupportedConversion(DOUBLE, DOUBLE_STRING, DOUBLE_VALUE);
-    testSupportedConversion(DOUBLE, FLOAT_STRING, (double) FLOAT_VALUE);
-    testSupportedConversion(DOUBLE, INT_STRING, (double) INT_VALUE);
+    testSupportedConversion(FieldType.DOUBLE, DOUBLE_STRING, DOUBLE_VALUE);
+    testSupportedConversion(FieldType.DOUBLE, FLOAT_STRING, (double) 
FLOAT_VALUE);
+    testSupportedConversion(FieldType.DOUBLE, INT_STRING, (double) INT_VALUE);
   }
 
   private void testSupportedConversion(
-      TypeName fieldType,
+      FieldType fieldType,
       String jsonFieldValue,
       Object expectedRowFieldValue) throws Exception {
 
-    String fieldName = "f_" + fieldType.name().toLowerCase();
+    String fieldName = "f_" + fieldType.getTypeName().name().toLowerCase();
     Schema schema = schemaWithField(fieldName, fieldType);
     Row expectedRow = 
Row.withSchema(schema).addValues(expectedRowFieldValue).build();
     ObjectMapper jsonParser = 
newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
@@ -467,84 +454,84 @@ private void testSupportedConversion(
 
   @Test
   public void testUnsupportedBooleanConversions() throws Exception {
-    testUnsupportedConversion(BOOLEAN, quoted(BOOLEAN_TRUE_STRING));
-    testUnsupportedConversion(BOOLEAN, BYTE_STRING);
-    testUnsupportedConversion(BOOLEAN, SHORT_STRING);
-    testUnsupportedConversion(BOOLEAN, INT_STRING);
-    testUnsupportedConversion(BOOLEAN, LONG_STRING);
-    testUnsupportedConversion(BOOLEAN, FLOAT_STRING);
-    testUnsupportedConversion(BOOLEAN, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.BOOLEAN, quoted(BOOLEAN_TRUE_STRING));
+    testUnsupportedConversion(FieldType.BOOLEAN, BYTE_STRING);
+    testUnsupportedConversion(FieldType.BOOLEAN, SHORT_STRING);
+    testUnsupportedConversion(FieldType.BOOLEAN, INT_STRING);
+    testUnsupportedConversion(FieldType.BOOLEAN, LONG_STRING);
+    testUnsupportedConversion(FieldType.BOOLEAN, FLOAT_STRING);
+    testUnsupportedConversion(FieldType.BOOLEAN, DOUBLE_STRING);
   }
 
   @Test
   public void testUnsupportedStringConversions() throws Exception {
-    testUnsupportedConversion(STRING, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(STRING, BYTE_STRING);
-    testUnsupportedConversion(STRING, SHORT_STRING);
-    testUnsupportedConversion(STRING, INT_STRING);
-    testUnsupportedConversion(STRING, LONG_STRING);
-    testUnsupportedConversion(STRING, FLOAT_STRING);
-    testUnsupportedConversion(STRING, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.STRING, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.STRING, BYTE_STRING);
+    testUnsupportedConversion(FieldType.STRING, SHORT_STRING);
+    testUnsupportedConversion(FieldType.STRING, INT_STRING);
+    testUnsupportedConversion(FieldType.STRING, LONG_STRING);
+    testUnsupportedConversion(FieldType.STRING, FLOAT_STRING);
+    testUnsupportedConversion(FieldType.STRING, DOUBLE_STRING);
   }
 
   @Test
   public void testUnsupportedByteConversions() throws Exception {
-    testUnsupportedConversion(BYTE, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(BYTE, quoted(BYTE_STRING));
-    testUnsupportedConversion(BYTE, SHORT_STRING);
-    testUnsupportedConversion(BYTE, INT_STRING);
-    testUnsupportedConversion(BYTE, LONG_STRING);
-    testUnsupportedConversion(BYTE, FLOAT_STRING);
-    testUnsupportedConversion(BYTE, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.BYTE, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.BYTE, quoted(BYTE_STRING));
+    testUnsupportedConversion(FieldType.BYTE, SHORT_STRING);
+    testUnsupportedConversion(FieldType.BYTE, INT_STRING);
+    testUnsupportedConversion(FieldType.BYTE, LONG_STRING);
+    testUnsupportedConversion(FieldType.BYTE, FLOAT_STRING);
+    testUnsupportedConversion(FieldType.BYTE, DOUBLE_STRING);
   }
 
   @Test
   public void testUnsupportedShortConversions() throws Exception {
-    testUnsupportedConversion(INT16, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(INT16, quoted(SHORT_STRING));
-    testUnsupportedConversion(INT16, INT_STRING);
-    testUnsupportedConversion(INT16, LONG_STRING);
-    testUnsupportedConversion(INT16, FLOAT_STRING);
-    testUnsupportedConversion(INT16, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.INT16, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.INT16, quoted(SHORT_STRING));
+    testUnsupportedConversion(FieldType.INT16, INT_STRING);
+    testUnsupportedConversion(FieldType.INT16, LONG_STRING);
+    testUnsupportedConversion(FieldType.INT16, FLOAT_STRING);
+    testUnsupportedConversion(FieldType.INT16, DOUBLE_STRING);
   }
 
   @Test
   public void testUnsupportedIntConversions() throws Exception {
-    testUnsupportedConversion(INT32, quoted(INT_STRING));
-    testUnsupportedConversion(INT32, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(INT32, LONG_STRING);
-    testUnsupportedConversion(INT32, FLOAT_STRING);
-    testUnsupportedConversion(INT32, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.INT32, quoted(INT_STRING));
+    testUnsupportedConversion(FieldType.INT32, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.INT32, LONG_STRING);
+    testUnsupportedConversion(FieldType.INT32, FLOAT_STRING);
+    testUnsupportedConversion(FieldType.INT32, DOUBLE_STRING);
   }
 
   @Test
   public void testUnsupportedLongConversions() throws Exception {
-    testUnsupportedConversion(INT64, quoted(LONG_STRING));
-    testUnsupportedConversion(INT64, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(INT64, FLOAT_STRING);
-    testUnsupportedConversion(INT64, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.INT64, quoted(LONG_STRING));
+    testUnsupportedConversion(FieldType.INT64, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.INT64, FLOAT_STRING);
+    testUnsupportedConversion(FieldType.INT64, DOUBLE_STRING);
   }
 
   @Test
   public void testUnsupportedFloatConversions() throws Exception {
-    testUnsupportedConversion(FLOAT, quoted(FLOAT_STRING));
-    testUnsupportedConversion(FLOAT, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(FLOAT, DOUBLE_STRING);
-    testUnsupportedConversion(FLOAT, INT_STRING); // too large to fit
+    testUnsupportedConversion(FieldType.FLOAT, quoted(FLOAT_STRING));
+    testUnsupportedConversion(FieldType.FLOAT, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.FLOAT, DOUBLE_STRING);
+    testUnsupportedConversion(FieldType.FLOAT, INT_STRING); // too large to fit
   }
 
   @Test
   public void testUnsupportedDoubleConversions() throws Exception {
-    testUnsupportedConversion(DOUBLE, quoted(DOUBLE_STRING));
-    testUnsupportedConversion(DOUBLE, BOOLEAN_TRUE_STRING);
-    testUnsupportedConversion(DOUBLE, LONG_STRING); // too large to fit
+    testUnsupportedConversion(FieldType.DOUBLE, quoted(DOUBLE_STRING));
+    testUnsupportedConversion(FieldType.DOUBLE, BOOLEAN_TRUE_STRING);
+    testUnsupportedConversion(FieldType.DOUBLE, LONG_STRING); // too large to 
fit
   }
 
   private void testUnsupportedConversion(
-      TypeName fieldType,
+      FieldType fieldType,
       String jsonFieldValue) throws Exception {
 
-    String fieldName = "f_" + fieldType.name().toLowerCase();
+    String fieldName = "f_" + fieldType.getTypeName().name().toLowerCase();
 
     ObjectMapper jsonParser =
         newObjectMapperWith(RowJsonDeserializer
@@ -561,11 +548,11 @@ private String quoted(String string) {
     return "\"" + string + "\"";
   }
 
-  private Schema schemaWithField(String fieldName, TypeName fieldType) {
+  private Schema schemaWithField(String fieldName, FieldType fieldType) {
     return
         Schema
             .builder()
-            .addField(Schema.Field.of(fieldName, fieldType.type()))
+            .addField(Schema.Field.of(fieldName, fieldType))
             .build();
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index b29f8c37104..b411cbeaa77 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -32,7 +32,7 @@
 import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.junit.Rule;
@@ -52,11 +52,11 @@ public void testCreatesNullRecord() {
     Schema type =
         Stream
             .of(
-                Schema.Field.of("f_int", TypeName.INT32.type())
+                Schema.Field.of("f_int", FieldType.INT32)
                     .withNullable(true),
-                Schema.Field.of("f_str", TypeName.STRING.type())
+                Schema.Field.of("f_str", FieldType.STRING)
                     .withNullable(true),
-                Schema.Field.of("f_double", TypeName.DOUBLE.type())
+                Schema.Field.of("f_double", FieldType.DOUBLE)
                     .withNullable(true))
             .collect(toSchema());
 
@@ -69,7 +69,7 @@ public void testCreatesNullRecord() {
 
   @Test
   public void testRejectsNullRecord() {
-    Schema type = Stream.of(Schema.Field.of("f_int", TypeName.INT32.type()))
+    Schema type = Stream.of(Schema.Field.of("f_int", Schema.FieldType.INT32))
         .collect(toSchema());
     thrown.expect(IllegalArgumentException.class);
     Row.nullRow(type);
@@ -78,16 +78,16 @@ public void testRejectsNullRecord() {
   @Test
   public void testCreatesRecord() {
     Schema schema = Schema.builder()
-        .addByteField("f_byte", false)
-        .addInt16Field("f_int16", false)
-        .addInt32Field("f_int32", false)
-        .addInt64Field("f_int64", false)
-        .addDecimalField("f_decimal", false)
-        .addFloatField("f_float", false)
-        .addDoubleField("f_double", false)
-        .addStringField("f_string", false)
-        .addDateTimeField("f_datetime", false)
-        .addBooleanField("f_boolean", false).build();
+        .addByteField("f_byte")
+        .addInt16Field("f_int16")
+        .addInt32Field("f_int32")
+        .addInt64Field("f_int64")
+        .addDecimalField("f_decimal")
+        .addFloatField("f_float")
+        .addDoubleField("f_double")
+        .addStringField("f_string")
+        .addDateTimeField("f_datetime")
+        .addBooleanField("f_boolean").build();
 
     DateTime dateTime = new DateTime().withDate(1979, 03, 14)
         .withTime(1, 2, 3, 4)
@@ -116,15 +116,15 @@ public void testCreatesRecord() {
   @Test
   public void testCreatesNestedRow() {
     Schema nestedType = Stream.of(
-        Schema.Field.of("f1_str", TypeName.STRING.type()))
+        Schema.Field.of("f1_str", Schema.FieldType.STRING))
         .collect(toSchema());
 
     Schema type =
         Stream
-            .of(Schema.Field.of("f_int", TypeName.INT32.type()),
+            .of(Schema.Field.of("f_int", Schema.FieldType.INT32),
                 Schema.Field.of("nested",
-                    TypeName.ROW.type()
-                    .withRowSchema(nestedType)))
+                    Schema.FieldType.row(
+                    nestedType)))
         .collect(toSchema());
     Row nestedRow = Row.withSchema(nestedType).addValues("foobar").build();
     Row row = Row.withSchema(type).addValues(42, nestedRow).build();
@@ -137,8 +137,7 @@ public void testCreatesArray() {
     List<Integer> data = Lists.newArrayList(2, 3, 5, 7);
     Schema type = Stream
         .of(Schema.Field.of("array",
-            TypeName.ARRAY.type()
-                .withCollectionElementType(TypeName.INT32.type())))
+            Schema.FieldType.array(Schema.FieldType.INT32)))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
@@ -147,7 +146,7 @@ public void testCreatesArray() {
   @Test
   public void testCreatesRowArray() {
     Schema nestedType = Stream.of(
-        Schema.Field.of("f1_str", TypeName.STRING.type()))
+        Schema.Field.of("f1_str", FieldType.STRING))
         .collect(toSchema());
     List<Row> data = Lists.newArrayList(
         Row.withSchema(nestedType).addValues("one").build(),
@@ -156,9 +155,7 @@ public void testCreatesRowArray() {
 
     Schema type = Stream
         .of(Schema.Field.of("array",
-            TypeName.ARRAY.type()
-                .withCollectionElementType(TypeName.ROW.type()
-                    .withRowSchema(nestedType))))
+            FieldType.array(FieldType.row(nestedType))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
@@ -170,9 +167,7 @@ public void testCreatesArrayArray() {
         Lists.newArrayList(1, 2, 3, 4));
     Schema type = Stream
         .of(Schema.Field.of("array",
-            TypeName.ARRAY.type()
-                .withCollectionElementType(TypeName.ARRAY.type()
-                    .withCollectionElementType(TypeName.INT32.type()))))
+            FieldType.array(FieldType.array(FieldType.INT32))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
@@ -186,8 +181,7 @@ public void testCreatesArrayOfMap() {
         .build();
     Schema type = Stream
         .of(Schema.Field.of("array",
-            TypeName.ARRAY.type().withCollectionElementType(
-                TypeName.MAP.type().withMapType(TypeName.INT32.type(), 
TypeName.STRING.type()))))
+            FieldType.array(FieldType.map(FieldType.INT32, FieldType.STRING))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addArray(data).build();
     assertEquals(data, row.getArray("array"));
@@ -203,7 +197,7 @@ public void testCreateMapWithPrimitiveValue() {
         .build();
     Schema type = Stream
         .of(Schema.Field.of("map",
-            TypeName.MAP.type().withMapType(TypeName.INT32.type(), 
TypeName.STRING.type())))
+            FieldType.map(FieldType.INT32, FieldType.STRING)))
         .collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
@@ -217,8 +211,8 @@ public void testCreateMapWithArrayValue() {
         .build();
     Schema type = Stream
         .of(Schema.Field.of("map",
-            TypeName.MAP.type().withMapType(TypeName.INT32.type(),
-                
TypeName.ARRAY.type().withCollectionElementType(TypeName.STRING.type()))))
+            FieldType.map(FieldType.INT32,
+                FieldType.array(FieldType.STRING))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
@@ -232,8 +226,7 @@ public void testCreateMapWithMapValue() {
         .build();
     Schema type = Stream
         .of(Schema.Field.of("map",
-            TypeName.MAP.type().withMapType(TypeName.INT32.type(),
-                TypeName.MAP.type().withMapType(TypeName.INT32.type(), 
TypeName.STRING.type()))))
+            FieldType.map(FieldType.INT32, FieldType.map(FieldType.INT32, 
FieldType.STRING))))
         .collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
@@ -241,15 +234,15 @@ public void testCreateMapWithMapValue() {
 
   @Test
   public void testCreateMapWithRowValue() {
-    Schema nestedType = Stream.of(Schema.Field.of("f1_str", 
TypeName.STRING.type()))
+    Schema nestedType = Stream.of(Schema.Field.of("f1_str", FieldType.STRING))
         .collect(toSchema());
     Map<Integer, Row> data = ImmutableMap.<Integer, Row>builder()
         .put(1, Row.withSchema(nestedType).addValues("one").build())
         .put(2, Row.withSchema(nestedType).addValues("two").build())
         .build();
-    Schema type = Stream.of(Schema.Field.of("map", TypeName.MAP.type()
-        .withMapType(TypeName.INT32.type(),
-        TypeName.ROW.type().withRowSchema(nestedType)))).collect(toSchema());
+    Schema type = Stream.of(Schema.Field.of("map", FieldType.map(
+        FieldType.INT32,
+        FieldType.row(nestedType)))).collect(toSchema());
     Row row = Row.withSchema(type).addValue(data).build();
     assertEquals(data, row.getMap("map"));
   }
@@ -259,9 +252,9 @@ public void testCollector() {
     Schema type =
         Stream
             .of(
-                Schema.Field.of("f_int", TypeName.INT32.type()),
-                Schema.Field.of("f_str", TypeName.STRING.type()),
-                Schema.Field.of("f_double", TypeName.DOUBLE.type()))
+                Schema.Field.of("f_int", FieldType.INT32),
+                Schema.Field.of("f_str", FieldType.STRING),
+                Schema.Field.of("f_double", FieldType.DOUBLE))
             .collect(toSchema());
 
     Row row =
@@ -279,9 +272,9 @@ public void testThrowsForIncorrectNumberOfFields() {
     Schema type =
         Stream
             .of(
-                Schema.Field.of("f_int", TypeName.INT32.type()),
-                Schema.Field.of("f_str", TypeName.STRING.type()),
-                Schema.Field.of("f_double", TypeName.DOUBLE.type()))
+                Schema.Field.of("f_int", FieldType.INT32),
+                Schema.Field.of("f_str", FieldType.STRING),
+                Schema.Field.of("f_double", FieldType.DOUBLE))
             .collect(toSchema());
 
     thrown.expect(IllegalArgumentException.class);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
index e0ca6dfc988..0f7650d5927 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
@@ -31,13 +31,12 @@
  * Unit tests for {@link InferredRowCoder}.
  */
 public class InferredRowCoderTest {
-  private static final boolean NOT_NULLABLE = false;
 
   private static final Schema PERSON_ROW_TYPE =
       Schema
           .builder()
-          .addInt32Field("ageYears", NOT_NULLABLE)
-          .addStringField("name", NOT_NULLABLE)
+          .addInt32Field("ageYears")
+          .addStringField("name")
           .build();
 
   private static final PersonPojo PERSON_FOO = new PersonPojo("Foo", 13);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 106012)
    Time Spent: 0.5h  (was: 20m)

> Refactor builder field nullability
> ----------------------------------
>
>                 Key: BEAM-4077
>                 URL: https://issues.apache.org/jira/browse/BEAM-4077
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently the Schema builder methods take a boolean for nullability. It would 
> be more standard to have separate builder methods. At this point the builder 
> might as well just take the Field spec since it does not add concision.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to