[ 
https://issues.apache.org/jira/browse/BEAM-4084?focusedWorklogId=107214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107214
 ]

ASF GitHub Bot logged work on BEAM-4084:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/May/18 15:46
            Start Date: 30/May/18 15:46
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5487: [BEAM-4084] 
Finish RowType -> Schema rename
URL: https://github.com/apache/beam/pull/5487
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactory.java
similarity index 92%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactory.java
index 7ce80f69153..f4069f646b3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactory.java
@@ -29,7 +29,7 @@
 import org.joda.time.DateTime;
 
 /**
- * A default implementation of the {@link RowTypeFactory} interface. The 
purpose of
+ * A default implementation of the {@link SchemaFactory} interface. The 
purpose of
  * the factory is to create a row types given a list of getters.
  *
  * <p>Row type is represented by {@link Schema} which essentially is a
@@ -42,10 +42,10 @@
  * <p>This is the default factory implementation used in {@link RowFactory}.
  *
  * <p>In other cases, when mapping requires extra logic, another implentation 
of the
- * {@link RowTypeFactory} should be used instead of this class.
+ * {@link SchemaFactory} should be used instead of this class.
  *
  */
-public class DefaultRowTypeFactory implements RowTypeFactory {
+public class DefaultSchemaFactory implements SchemaFactory {
   private static final ImmutableMap<Class, TypeName> SUPPORTED_TYPES =
       ImmutableMap.<Class, TypeName>builder()
           .put(Boolean.class, TypeName.BOOLEAN)
@@ -74,7 +74,7 @@
    * Uses {@link CoderRegistry#createDefault()} to get coders for {@link 
FieldValueGetter#type()}.
    */
   @Override
-  public Schema createRowType(Iterable<FieldValueGetter> fieldValueGetters) {
+  public Schema createSchema(Iterable<FieldValueGetter> fieldValueGetters) {
     List<Schema.Field> fields = Lists.newArrayList();
     for (FieldValueGetter getter : fieldValueGetters) {
       fields.add(Schema.Field.of(getter.name(), 
getTypeDescriptor(getter.type())));
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java
index 1dc91aa6a9e..4ec3fafce62 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java
@@ -67,10 +67,10 @@
   }
 
   /**
-   * Returns a {@link InferredRowCoder} with row type factory overridden by 
{@code rowTypeFactory}.
+   * Returns a {@link InferredRowCoder} with row type factory overridden by 
{@code schemaFactory}.
    */
-  public InferredRowCoder<T> withRowTypeFactory(RowTypeFactory rowTypeFactory) 
{
-    return 
toBuilder().setRowFactory(RowFactory.withRowTypeFactory(rowTypeFactory)).build();
+  public InferredRowCoder<T> withSchemaFactory(SchemaFactory schemaFactory) {
+    return 
toBuilder().setRowFactory(RowFactory.withSchemaFactory(schemaFactory)).build();
   }
 
   static <W> Builder<W> builder() {
@@ -79,12 +79,12 @@
 
   abstract Builder<T> toBuilder();
 
-  public Schema rowType() {
-    return rowFactory().getRowType(elementType());
+  public Schema schema() {
+    return rowFactory().getSchema(elementType());
   }
 
   public RowCoder rowCoder() {
-    return rowType().getRowCoder();
+    return schema().getRowCoder();
   }
 
   public Row createRow(T element) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java
index 3e504e71614..e3bfd3cd009 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java
@@ -37,39 +37,39 @@
  *
  * <p>At the moment single pojo class corresponds to single {@link Schema}.
  *
- * <p>Supported pojo getter types depend on types supported by the {@link 
RowTypeFactory}.
- * See {@link DefaultRowTypeFactory} for default implementation.
+ * <p>Supported pojo getter types depend on types supported by the {@link 
SchemaFactory}.
+ * See {@link DefaultSchemaFactory} for default implementation.
  */
 @Internal
 public class RowFactory implements Serializable {
 
-  private RowTypeFactory rowTypeFactory;
+  private SchemaFactory schemaFactory;
   private final List<GetterFactory> getterFactories;
-  private transient Map<Class, RowTypeGetters> rowTypesCache;
+  private transient Map<Class, SchemaGetters> schemaCache;
 
   /**
-   * Creates an instance of {@link RowFactory} using {@link 
DefaultRowTypeFactory}
+   * Creates an instance of {@link RowFactory} using {@link 
DefaultSchemaFactory}
    * and {@link GeneratedGetterFactory}.
    */
   public static RowFactory createDefault() {
-    return withRowTypeFactory(new DefaultRowTypeFactory());
+    return withSchemaFactory(new DefaultSchemaFactory());
   }
 
 
   /**
-   * Creates an instance of {@link RowFactory} using provided {@link 
RowTypeFactory}
+   * Creates an instance of {@link RowFactory} using provided {@link 
SchemaFactory}
    * and {@link GeneratedGetterFactory}.
    */
-  public static RowFactory withRowTypeFactory(RowTypeFactory rowTypeFactory) {
-    return of(rowTypeFactory, new GeneratedGetterFactory());
+  public static RowFactory withSchemaFactory(SchemaFactory schemaFactory) {
+    return of(schemaFactory, new GeneratedGetterFactory());
   }
 
   /**
-   * Creates an instance of {@link RowFactory} using provided {@link 
RowTypeFactory}
+   * Creates an instance of {@link RowFactory} using provided {@link 
SchemaFactory}
    * and {@link GetterFactory}.
    */
-  public static RowFactory of(RowTypeFactory rowTypeFactory, GetterFactory 
getterFactory) {
-    return new RowFactory(rowTypeFactory, getterFactory);
+  public static RowFactory of(SchemaFactory schemaFactory, GetterFactory 
getterFactory) {
+    return new RowFactory(schemaFactory, getterFactory);
   }
 
   /**
@@ -77,13 +77,13 @@ public static RowFactory of(RowTypeFactory rowTypeFactory, 
GetterFactory getterF
    *
    * <p>For example this can be used to create BeamRecordSqlTypes instead of 
{@link Schema}.
    */
-  RowFactory(RowTypeFactory rowTypeFactory, GetterFactory ... getterFactories) 
{
-    this.rowTypeFactory = rowTypeFactory;
+  RowFactory(SchemaFactory schemaFactory, GetterFactory ... getterFactories) {
+    this.schemaFactory = schemaFactory;
     this.getterFactories = Arrays.asList(getterFactories);
   }
 
-  public <T> Schema getRowType(Class<T> elementType) {
-    return getRecordType(elementType).rowType();
+  public <T> Schema getSchema(Class<T> elementType) {
+    return getRecordType(elementType).schema();
   }
 
   /**
@@ -98,25 +98,25 @@ public static RowFactory of(RowTypeFactory rowTypeFactory, 
GetterFactory getterF
    * For example record field 'name' will be generated for 'getName()' pojo 
method.
    */
   public <T> Row create(T pojo) {
-    RowTypeGetters getters = getRecordType(pojo.getClass());
+    SchemaGetters getters = getRecordType(pojo.getClass());
     List<Object> fieldValues = getFieldValues(getters.valueGetters(), pojo);
-    return Row.withSchema(getters.rowType()).addValues(fieldValues).build();
+    return Row.withSchema(getters.schema()).addValues(fieldValues).build();
   }
 
-  private synchronized RowTypeGetters getRecordType(Class pojoClass) {
-    if (rowTypesCache == null) {
-      rowTypesCache = new HashMap<>();
+  private synchronized SchemaGetters getRecordType(Class pojoClass) {
+    if (schemaCache == null) {
+      schemaCache = new HashMap<>();
     }
 
-    if (rowTypesCache.containsKey(pojoClass)) {
-      return rowTypesCache.get(pojoClass);
+    if (schemaCache.containsKey(pojoClass)) {
+      return schemaCache.get(pojoClass);
     }
 
     List<FieldValueGetter> fieldValueGetters = createGetters(pojoClass);
-    Schema schema = rowTypeFactory.createRowType(fieldValueGetters);
-    rowTypesCache.put(pojoClass, new RowTypeGetters(schema, 
fieldValueGetters));
+    Schema schema = schemaFactory.createSchema(fieldValueGetters);
+    schemaCache.put(pojoClass, new SchemaGetters(schema, fieldValueGetters));
 
-    return rowTypesCache.get(pojoClass);
+    return schemaCache.get(pojoClass);
   }
 
   private List<FieldValueGetter> createGetters(Class pojoClass) {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaFactory.java
similarity index 89%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeFactory.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaFactory.java
index dfb03305f47..efb02bafed0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaFactory.java
@@ -30,15 +30,15 @@
  * <p>Different implementations can have different ways of mapping getter 
types to coders.
  * For example Beam SQL uses custom mapping via java.sql.Types.
  *
- * <p>Default implementation is {@link DefaultRowTypeFactory}.
+ * <p>Default implementation is {@link DefaultSchemaFactory}.
  * It returns instances of {@link Schema}, mapping {@link 
FieldValueGetter#type()}
  * to known coders.
  */
 @Internal
-public interface RowTypeFactory extends Serializable {
+public interface SchemaFactory extends Serializable {
 
   /**
    * Create a {@link Schema} for the list of the pojo field getters.
    */
-  Schema createRowType(Iterable<FieldValueGetter> getters);
+  Schema createSchema(Iterable<FieldValueGetter> getters);
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaGetters.java
similarity index 89%
rename from 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java
rename to 
sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaGetters.java
index ccb71b44319..e70ce5f4971 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaGetters.java
@@ -28,11 +28,11 @@
  *
  * <p>This is used in {@link RowFactory} to create instances of {@link Row}s.
  */
-class RowTypeGetters {
+class SchemaGetters {
   private Schema schema;
   private List<FieldValueGetter> fieldValueGetters;
 
-  RowTypeGetters(Schema schema, List<FieldValueGetter> fieldValueGetters) {
+  SchemaGetters(Schema schema, List<FieldValueGetter> fieldValueGetters) {
     this.schema = schema;
     this.fieldValueGetters = fieldValueGetters;
   }
@@ -40,13 +40,13 @@
   /**
    * Returns a {@link Schema}.
    */
-  Schema rowType() {
+  Schema schema() {
     return schema;
   }
 
   /**
    * Returns the list of {@link FieldValueGetter}s which
-   * were used to create {@link RowTypeGetters#rowType()}.
+   * were used to create {@link SchemaGetters#schema()}.
    */
   List<FieldValueGetter> valueGetters() {
     return fieldValueGetters;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
index 69d655416a8..3bdb7e4c460 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java
@@ -28,7 +28,7 @@
 import org.junit.rules.ExpectedException;
 
 /**
- * Unit tests for {@link DefaultRowTypeFactory}.
+ * Unit tests for {@link DefaultSchemaFactory}.
  */
 public class DefaultSchemaFactoryTest {
 
@@ -53,9 +53,9 @@
 
   @Test
   public void testContainsCorrectFields() throws Exception {
-    DefaultRowTypeFactory factory = new DefaultRowTypeFactory();
+    DefaultSchemaFactory factory = new DefaultSchemaFactory();
 
-    Schema schema = factory.createRowType(GETTERS);
+    Schema schema = factory.createSchema(GETTERS);
 
     assertEquals(GETTERS.size(), schema.getFieldCount());
     assertEquals(
@@ -73,9 +73,9 @@ public void testContainsCorrectFields() throws Exception {
   public void testThrowsForUnsupportedTypes() throws Exception {
     thrown.expect(UnsupportedOperationException.class);
 
-    DefaultRowTypeFactory factory = new DefaultRowTypeFactory();
+    DefaultSchemaFactory factory = new DefaultSchemaFactory();
 
-    factory.createRowType(
+    factory.createSchema(
         Arrays.<FieldValueGetter>asList(getter("unsupportedGetter", 
UnsupportedClass.class)));
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
index e0ca6dfc988..e590735689e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java
@@ -73,13 +73,13 @@ public String getName() {
   }
 
   @Test
-  public void testCreatesRowType() {
+  public void testCreatesSchema() {
     InferredRowCoder<PersonPojo> inferredCoder = 
InferredRowCoder.ofSerializable(PersonPojo.class);
-    Schema rowType = inferredCoder.rowType();
+    Schema schema = inferredCoder.schema();
 
-    assertEquals(2, rowType.getFieldCount());
+    assertEquals(2, schema.getFieldCount());
     assertThat(
-        rowType.getFields(),
+        schema.getFields(),
         containsInAnyOrder(PERSON_ROW_TYPE.getField(0), 
PERSON_ROW_TYPE.getField(1)));
   }
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java
index 95e8fc68c1f..b9ca89380aa 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java
@@ -142,6 +142,6 @@ public void testCopiesValues() throws Exception {
   }
 
   private RowFactory newFactory() {
-    return new RowFactory(new DefaultRowTypeFactory(), getterFactory);
+    return new RowFactory(new DefaultSchemaFactory(), getterFactory);
   }
 }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java
index aa64be200c2..f4782363cac 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java
@@ -26,7 +26,7 @@
 import org.junit.Test;
 
 /**
- * Unit tests for {@link RowTypeGetters}.
+ * Unit tests for {@link SchemaGetters}.
  */
 public class SchemaGettersTest {
 
@@ -35,9 +35,9 @@ public void testGetters() {
     Schema schema = Schema.builder().build();
     List<FieldValueGetter> fieldValueGetters = emptyList();
 
-    RowTypeGetters getters = new RowTypeGetters(schema, fieldValueGetters);
+    SchemaGetters getters = new SchemaGetters(schema, fieldValueGetters);
 
-    assertSame(schema, getters.rowType());
+    assertSame(schema, getters.schema());
     assertSame(fieldValueGetters, getters.valueGetters());
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
index 902ef935fc3..7ae03a6e102 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java
@@ -153,7 +153,7 @@ public Builder withMapField(
       return this;
     }
 
-    /** Adds an ARRAY field with elements of {@code rowType}. */
+    /** Adds an ARRAY field with elements of {@code schema}. */
     public Builder withArrayField(String fieldName, Schema schema) {
       FieldType collectionElementType = 
FieldType.of(TypeName.ROW).withRowSchema(schema);
       builder.addField(
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index f53f0cbfaea..4032592d067 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -76,7 +76,7 @@
           TypeName.DATETIME.type(), SqlTypeName.TIMESTAMP,
           TypeName.STRING.type(), SqlTypeName.VARCHAR);
 
-  /** Generate {@code BeamSqlRowType} from {@code RelDataType} which is used 
to create table. */
+  /** Generate {@link Schema} from {@code RelDataType} which is used to create 
table. */
   public static Schema toBeamSchema(RelDataType tableInfo) {
     return tableInfo
         .getFieldList()
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
index 2160e8e5fd8..694d4d32c17 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java
@@ -35,7 +35,7 @@
 /** Tests for SQL arrays. */
 public class BeamSqlDslArrayTest {
 
-  private static final Schema INPUT_ROW_TYPE =
+  private static final Schema INPUT_SCHEMA =
       RowSqlTypes.builder()
           .withIntegerField("f_int")
           .withArrayField("f_stringArr", SqlTypeName.VARCHAR)
@@ -109,12 +109,11 @@ public void testAccessArrayElement() {
 
   @Test
   public void testSingleElement() throws Exception {
-    Row inputRow =
-        
Row.withSchema(INPUT_ROW_TYPE).addValues(1).addArray(Arrays.asList("111")).build();
+    Row inputRow = 
Row.withSchema(INPUT_SCHEMA).addValues(1).addArray(Arrays.asList("111")).build();
 
     PCollection<Row> input =
         PBegin.in(pipeline)
-            .apply("boundedInput1", 
Create.of(inputRow).withCoder(INPUT_ROW_TYPE.getRowCoder()));
+            .apply("boundedInput1", 
Create.of(inputRow).withCoder(INPUT_SCHEMA.getRowCoder()));
 
     Schema resultType = 
RowSqlTypes.builder().withVarcharField("f_arrElem").build();
 
@@ -146,7 +145,7 @@ public void testCardinality() {
   @Test
   public void testUnnestLiteral() {
     PCollection<Row> input =
-        PBegin.in(pipeline).apply("boundedInput1", 
Create.empty(INPUT_ROW_TYPE.getRowCoder()));
+        PBegin.in(pipeline).apply("boundedInput1", 
Create.empty(INPUT_SCHEMA.getRowCoder()));
 
     // Because we have a multi-part FROM the DSL considers it multi-input
     TupleTag<Row> mainTag = new TupleTag<Row>("main") {};
@@ -169,7 +168,7 @@ public void testUnnestLiteral() {
   @Test
   public void testUnnestNamedLiteral() {
     PCollection<Row> input =
-        PBegin.in(pipeline).apply("boundedInput1", 
Create.empty(INPUT_ROW_TYPE.getRowCoder()));
+        PBegin.in(pipeline).apply("boundedInput1", 
Create.empty(INPUT_SCHEMA.getRowCoder()));
 
     // Because we have a multi-part FROM the DSL considers it multi-input
     TupleTag<Row> mainTag = new TupleTag<Row>("main") {};
@@ -194,17 +193,17 @@ public void testUnnestNamedLiteral() {
   @Test
   public void testUnnestCrossJoin() {
     Row row1 =
-        Row.withSchema(INPUT_ROW_TYPE)
+        Row.withSchema(INPUT_SCHEMA)
             .addValues(42)
             .addArray(Arrays.asList("111", "222", "333"))
             .build();
 
     Row row2 =
-        
Row.withSchema(INPUT_ROW_TYPE).addValues(13).addArray(Arrays.asList("444", 
"555")).build();
+        
Row.withSchema(INPUT_SCHEMA).addValues(13).addArray(Arrays.asList("444", 
"555")).build();
 
     PCollection<Row> input =
         PBegin.in(pipeline)
-            .apply("boundedInput1", Create.of(row1, 
row2).withCoder(INPUT_ROW_TYPE.getRowCoder()));
+            .apply("boundedInput1", Create.of(row1, 
row2).withCoder(INPUT_SCHEMA.getRowCoder()));
 
     // Because we have a multi-part FROM the DSL considers it multi-input
     TupleTag<Row> mainTag = new TupleTag<Row>("main") {};
@@ -233,16 +232,16 @@ public void testUnnestCrossJoin() {
 
   @Test
   public void testSelectRowsFromArrayOfRows() {
-    Schema elementRowType =
+    Schema elementSchema =
         
RowSqlTypes.builder().withVarcharField("f_rowString").withIntegerField("f_rowInt").build();
 
-    Schema resultRowType =
-        RowSqlTypes.builder().withArrayField("f_resultArray", 
elementRowType).build();
+    Schema resultSchema =
+        RowSqlTypes.builder().withArrayField("f_resultArray", 
elementSchema).build();
 
     Schema inputType =
         RowSqlTypes.builder()
             .withIntegerField("f_int")
-            .withArrayField("f_arrayOfRows", elementRowType)
+            .withArrayField("f_arrayOfRows", elementSchema)
             .build();
 
     PCollection<Row> input =
@@ -253,36 +252,36 @@ public void testSelectRowsFromArrayOfRows() {
                             .addValues(
                                 1,
                                 Arrays.asList(
-                                    
Row.withSchema(elementRowType).addValues("AA", 11).build(),
-                                    
Row.withSchema(elementRowType).addValues("BB", 22).build()))
+                                    
Row.withSchema(elementSchema).addValues("AA", 11).build(),
+                                    
Row.withSchema(elementSchema).addValues("BB", 22).build()))
                             .build(),
                         Row.withSchema(inputType)
                             .addValues(
                                 2,
                                 Arrays.asList(
-                                    
Row.withSchema(elementRowType).addValues("CC", 33).build(),
-                                    
Row.withSchema(elementRowType).addValues("DD", 44).build()))
+                                    
Row.withSchema(elementSchema).addValues("CC", 33).build(),
+                                    
Row.withSchema(elementSchema).addValues("DD", 44).build()))
                             .build())
                     .withCoder(inputType.getRowCoder()));
 
     PCollection<Row> result =
         input
             .apply(BeamSql.query("SELECT f_arrayOfRows FROM PCOLLECTION"))
-            .setCoder(resultRowType.getRowCoder());
+            .setCoder(resultSchema.getRowCoder());
 
     PAssert.that(result)
         .containsInAnyOrder(
-            Row.withSchema(resultRowType)
+            Row.withSchema(resultSchema)
                 .addArray(
                     Arrays.asList(
-                        Row.withSchema(elementRowType).addValues("AA", 
11).build(),
-                        Row.withSchema(elementRowType).addValues("BB", 
22).build()))
+                        Row.withSchema(elementSchema).addValues("AA", 
11).build(),
+                        Row.withSchema(elementSchema).addValues("BB", 
22).build()))
                 .build(),
-            Row.withSchema(resultRowType)
+            Row.withSchema(resultSchema)
                 .addArray(
                     Arrays.asList(
-                        Row.withSchema(elementRowType).addValues("CC", 
33).build(),
-                        Row.withSchema(elementRowType).addValues("DD", 
44).build()))
+                        Row.withSchema(elementSchema).addValues("CC", 
33).build(),
+                        Row.withSchema(elementSchema).addValues("DD", 
44).build()))
                 .build());
 
     pipeline.run();
@@ -290,15 +289,15 @@ public void testSelectRowsFromArrayOfRows() {
 
   @Test
   public void testSelectSingleRowFromArrayOfRows() {
-    Schema elementRowType =
+    Schema elementSchema =
         
RowSqlTypes.builder().withVarcharField("f_rowString").withIntegerField("f_rowInt").build();
 
-    Schema resultRowType = elementRowType;
+    Schema resultSchema = elementSchema;
 
     Schema inputType =
         RowSqlTypes.builder()
             .withIntegerField("f_int")
-            .withArrayField("f_arrayOfRows", elementRowType)
+            .withArrayField("f_arrayOfRows", elementSchema)
             .build();
 
     PCollection<Row> input =
@@ -309,42 +308,42 @@ public void testSelectSingleRowFromArrayOfRows() {
                             .addValues(
                                 1,
                                 Arrays.asList(
-                                    
Row.withSchema(elementRowType).addValues("AA", 11).build(),
-                                    
Row.withSchema(elementRowType).addValues("BB", 22).build()))
+                                    
Row.withSchema(elementSchema).addValues("AA", 11).build(),
+                                    
Row.withSchema(elementSchema).addValues("BB", 22).build()))
                             .build(),
                         Row.withSchema(inputType)
                             .addValues(
                                 2,
                                 Arrays.asList(
-                                    
Row.withSchema(elementRowType).addValues("CC", 33).build(),
-                                    
Row.withSchema(elementRowType).addValues("DD", 44).build()))
+                                    
Row.withSchema(elementSchema).addValues("CC", 33).build(),
+                                    
Row.withSchema(elementSchema).addValues("DD", 44).build()))
                             .build())
                     .withCoder(inputType.getRowCoder()));
 
     PCollection<Row> result =
         input
             .apply(BeamSql.query("SELECT f_arrayOfRows[1] FROM PCOLLECTION"))
-            .setCoder(resultRowType.getRowCoder());
+            .setCoder(resultSchema.getRowCoder());
 
     PAssert.that(result)
         .containsInAnyOrder(
-            Row.withSchema(elementRowType).addValues("BB", 22).build(),
-            Row.withSchema(elementRowType).addValues("DD", 44).build());
+            Row.withSchema(elementSchema).addValues("BB", 22).build(),
+            Row.withSchema(elementSchema).addValues("DD", 44).build());
 
     pipeline.run();
   }
 
   @Test
   public void testSelectRowFieldFromArrayOfRows() {
-    Schema elementRowType =
+    Schema elementSchema =
         
RowSqlTypes.builder().withVarcharField("f_rowString").withIntegerField("f_rowInt").build();
 
-    Schema resultRowType = 
RowSqlTypes.builder().withVarcharField("f_stringField").build();
+    Schema resultSchema = 
RowSqlTypes.builder().withVarcharField("f_stringField").build();
 
     Schema inputType =
         RowSqlTypes.builder()
             .withIntegerField("f_int")
-            .withArrayField("f_arrayOfRows", elementRowType)
+            .withArrayField("f_arrayOfRows", elementSchema)
             .build();
 
     PCollection<Row> input =
@@ -355,27 +354,27 @@ public void testSelectRowFieldFromArrayOfRows() {
                             .addValues(
                                 1,
                                 Arrays.asList(
-                                    
Row.withSchema(elementRowType).addValues("AA", 11).build(),
-                                    
Row.withSchema(elementRowType).addValues("BB", 22).build()))
+                                    
Row.withSchema(elementSchema).addValues("AA", 11).build(),
+                                    
Row.withSchema(elementSchema).addValues("BB", 22).build()))
                             .build(),
                         Row.withSchema(inputType)
                             .addValues(
                                 2,
                                 Arrays.asList(
-                                    
Row.withSchema(elementRowType).addValues("CC", 33).build(),
-                                    
Row.withSchema(elementRowType).addValues("DD", 44).build()))
+                                    
Row.withSchema(elementSchema).addValues("CC", 33).build(),
+                                    
Row.withSchema(elementSchema).addValues("DD", 44).build()))
                             .build())
                     .withCoder(inputType.getRowCoder()));
 
     PCollection<Row> result =
         input
             .apply(BeamSql.query("SELECT f_arrayOfRows[1].f_rowString FROM 
PCOLLECTION"))
-            .setCoder(resultRowType.getRowCoder());
+            .setCoder(resultSchema.getRowCoder());
 
     PAssert.that(result)
         .containsInAnyOrder(
-            Row.withSchema(resultRowType).addValues("BB").build(),
-            Row.withSchema(resultRowType).addValues("DD").build());
+            Row.withSchema(resultSchema).addValues("BB").build(),
+            Row.withSchema(resultSchema).addValues("DD").build());
 
     pipeline.run();
   }
@@ -385,14 +384,14 @@ public void testSelectRowFieldFromArrayOfRows() {
         .apply(
             "boundedInput1",
             Create.of(
-                    Row.withSchema(INPUT_ROW_TYPE)
+                    Row.withSchema(INPUT_SCHEMA)
                         .addValues(1)
                         .addArray(Arrays.asList("111", "222"))
                         .build(),
-                    Row.withSchema(INPUT_ROW_TYPE)
+                    Row.withSchema(INPUT_SCHEMA)
                         .addValues(2)
                         .addArray(Arrays.asList("33", "44", "55"))
                         .build())
-                .withCoder(INPUT_ROW_TYPE.getRowCoder()));
+                .withCoder(INPUT_SCHEMA.getRowCoder()));
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
index a8055a4d339..8cc54b33331 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java
@@ -26,9 +26,9 @@
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.values.reflect.DefaultRowTypeFactory;
+import org.apache.beam.sdk.values.reflect.DefaultSchemaFactory;
 import org.apache.beam.sdk.values.reflect.FieldValueGetter;
-import org.apache.beam.sdk.values.reflect.RowTypeFactory;
+import org.apache.beam.sdk.values.reflect.SchemaFactory;
 import org.joda.time.DateTime;
 import org.junit.Rule;
 import org.junit.Test;
@@ -56,9 +56,9 @@
 
   @Test
   public void testContainsCorrectFields() throws Exception {
-    RowTypeFactory factory = new DefaultRowTypeFactory();
+    SchemaFactory factory = new DefaultSchemaFactory();
 
-    Schema schema = factory.createRowType(GETTERS_FOR_KNOWN_TYPES);
+    Schema schema = factory.createSchema(GETTERS_FOR_KNOWN_TYPES);
 
     assertEquals(GETTERS_FOR_KNOWN_TYPES.size(), schema.getFieldCount());
     assertEquals(
@@ -81,9 +81,9 @@ public void testContainsCorrectFields() throws Exception {
   public void testThrowsForUnsupportedTypes() throws Exception {
     thrown.expect(UnsupportedOperationException.class);
 
-    RowTypeFactory factory = new DefaultRowTypeFactory();
+    SchemaFactory factory = new DefaultSchemaFactory();
 
-    factory.createRowType(
+    factory.createSchema(
         Arrays.<FieldValueGetter>asList(getter("arrayListGetter", 
ArrayList.class)));
   }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
index 1b14f8a2028..b92615733d0 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java
@@ -103,7 +103,7 @@ public static RowsBuilder rowsBuilderOf(Schema type) {
      * @args pairs of column type and column names.
      */
     public static RowsBuilder of(final Object... args) {
-      Schema beamSQLSchema = buildBeamSqlRowType(args);
+      Schema beamSQLSchema = buildBeamSqlSchema(args);
       RowsBuilder builder = new RowsBuilder();
       builder.type = beamSQLSchema;
 
@@ -120,8 +120,6 @@ public static RowsBuilder of(final Object... args) {
      *   schema
      * )
      * }</pre>
-     *
-     * @beamSQLRowType the row type.
      */
     public static RowsBuilder of(final Schema schema) {
       RowsBuilder builder = new RowsBuilder();
@@ -159,7 +157,7 @@ public RowsBuilder addRows(final List args) {
     }
 
     public PCollectionBuilder getPCollectionBuilder() {
-      return pCollectionBuilder().withRowType(type).withRows(rows);
+      return pCollectionBuilder().withSchema(type).withRows(rows);
     }
   }
 
@@ -173,7 +171,7 @@ public static PCollectionBuilder pCollectionBuilder() {
     private String timestampField;
     private Pipeline pipeline;
 
-    public PCollectionBuilder withRowType(Schema type) {
+    public PCollectionBuilder withSchema(Schema type) {
       this.type = type;
       return this;
     }
@@ -224,12 +222,12 @@ public PCollectionBuilder inPipeline(Pipeline pipeline) {
   }
 
   /**
-   * Convenient way to build a {@code BeamSqlRowType}.
+   * Convenient way to build a {@link Schema}.
    *
    * <p>e.g.
    *
    * <pre>{@code
-   * buildBeamSqlRowType(
+   * buildBeamSqlSchema(
    *     SqlCoders.BIGINT, "order_id",
    *     SqlCoders.INTEGER, "site_id",
    *     SqlCoders.DOUBLE, "price",
@@ -237,7 +235,7 @@ public PCollectionBuilder inPipeline(Pipeline pipeline) {
    * )
    * }</pre>
    */
-  public static Schema buildBeamSqlRowType(Object... args) {
+  public static Schema buildBeamSqlSchema(Object... args) {
     return Stream.iterate(0, i -> i + 2)
         .limit(args.length / 2)
         .map(i -> toRecordField(args, i))
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index dbbd91805fb..bd62b3f8a8c 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -101,7 +101,7 @@ public static void prepare() {
     registerTable(
         "SITE_LKP",
         new SiteLookupTable(
-            TestUtils.buildBeamSqlRowType(
+            TestUtils.buildBeamSqlSchema(
                 TypeName.INT32, "site_id",
                 TypeName.STRING, "site_name")));
   }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
index 2178b1c1f8e..11aee8a6e55 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java
@@ -124,7 +124,7 @@ public void testCountPerElementBasic() throws 
ParseException {
                     new BeamAggregationTransforms.AggregationAdaptor(aggCalls, 
inputSchema)))
             .setCoder(KvCoder.of(keyCoder, aggCoder));
 
-    //4. flat KV to a single record
+    // 4. flat KV to a single record
     PCollection<Row> mergedStream =
         aggregatedStream.apply(
             "mergeRecord",
@@ -132,13 +132,13 @@ public void testCountPerElementBasic() throws 
ParseException {
                 new 
BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1)));
     mergedStream.setCoder(outRecordCoder);
 
-    //assert function BeamAggregationTransform.AggregationGroupByKeyFn
+    // assert function BeamAggregationTransform.AggregationGroupByKeyFn
     
PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
 
-    //assert BeamAggregationTransform.AggregationCombineFn
+    // assert BeamAggregationTransform.AggregationCombineFn
     
PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn());
 
-    //assert BeamAggregationTransform.MergeAggregationRecord
+    // assert BeamAggregationTransform.MergeAggregationRecord
     
PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRow());
 
     p.run();
@@ -152,7 +152,7 @@ private void setupEnvironment() {
   /** create list of all {@link AggregateCall}. */
   @SuppressWarnings("deprecation")
   private void prepareAggregationCalls() {
-    //aggregations for all data type
+    // aggregations for all data type
     aggCalls = new ArrayList<>();
     aggCalls.add(
         new AggregateCall(
@@ -393,7 +393,7 @@ private void prepareTypeAndCoder() {
 
     aggCoder = aggPartType.getRowCoder();
 
-    outputType = prepareFinalRowType();
+    outputType = prepareFinalSchema();
     outRecordCoder = outputType.getRowCoder();
   }
 
@@ -446,7 +446,7 @@ private void prepareTypeAndCoder() {
   }
 
   /** Row type of final output row. */
-  private Schema prepareFinalRowType() {
+  private Schema prepareFinalSchema() {
     return RowSqlTypes.builder()
         .withIntegerField("f_int")
         .withBigIntField("count")
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
index 928a05cb398..040ffdacb04 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java
@@ -41,9 +41,9 @@
 public class BeamKafkaCSVTableTest {
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
-  private static final Row ROW1 = Row.withSchema(genRowType()).addValues(1L, 
1, 1.0).build();
+  private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 
1.0).build();
 
-  private static final Row ROW2 = Row.withSchema(genRowType()).addValues(2L, 
2, 2.0).build();
+  private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 
2.0).build();
 
   @Test
   public void testCsvRecorderDecoder() throws Exception {
@@ -51,7 +51,7 @@ public void testCsvRecorderDecoder() throws Exception {
         pipeline
             .apply(Create.of("1,\"1\",1.0", "2,2,2.0"))
             .apply(ParDo.of(new String2KvBytes()))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), 
CSVFormat.DEFAULT));
+            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
 
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
@@ -63,15 +63,15 @@ public void testCsvRecorderEncoder() throws Exception {
     PCollection<Row> result =
         pipeline
             .apply(Create.of(ROW1, ROW2))
-            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), 
CSVFormat.DEFAULT))
-            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), 
CSVFormat.DEFAULT));
+            .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), 
CSVFormat.DEFAULT))
+            .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), 
CSVFormat.DEFAULT));
 
     PAssert.that(result).containsInAnyOrder(ROW1, ROW2);
 
     pipeline.run();
   }
 
-  private static Schema genRowType() {
+  private static Schema genSchema() {
     JavaTypeFactory typeFactory = new 
JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
     return CalciteUtils.toBeamSchema(
         typeFactory
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
index 4f94177e9d1..6a7384a9460 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.mock;
 
-import static org.apache.beam.sdk.extensions.sql.TestUtils.buildBeamSqlRowType;
+import static org.apache.beam.sdk.extensions.sql.TestUtils.buildBeamSqlSchema;
 import static org.apache.beam.sdk.extensions.sql.TestUtils.buildRows;
 
 import java.util.ArrayList;
@@ -62,7 +62,7 @@ public MockedBoundedTable(Schema beamSchema) {
    * }</pre>
    */
   public static MockedBoundedTable of(final Object... args) {
-    return new MockedBoundedTable(buildBeamSqlRowType(args));
+    return new MockedBoundedTable(buildBeamSqlSchema(args));
   }
 
   /** Build a mocked bounded table with the specified type. */
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
index 295accbbdc9..2bf81a184a6 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java
@@ -58,7 +58,7 @@ private MockedUnboundedTable(Schema beamSchema) {
    * }</pre>
    */
   public static MockedUnboundedTable of(final Object... args) {
-    return new MockedUnboundedTable(TestUtils.buildBeamSqlRowType(args));
+    return new MockedUnboundedTable(TestUtils.buildBeamSqlSchema(args));
   }
 
   public MockedUnboundedTable timestampColumnIndex(int idx) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 107214)
    Time Spent: 50m  (was: 40m)

> Find remaining uses of rowType and RowType, etc, and make them Schema as 
> appropriate
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-4084
>                 URL: https://issues.apache.org/jira/browse/BEAM-4084
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to