[ https://issues.apache.org/jira/browse/BEAM-4084?focusedWorklogId=107214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107214 ]
ASF GitHub Bot logged work on BEAM-4084: ---------------------------------------- Author: ASF GitHub Bot Created on: 30/May/18 15:46 Start Date: 30/May/18 15:46 Worklog Time Spent: 10m Work Description: kennknowles closed pull request #5487: [BEAM-4084] Finish RowType -> Schema rename URL: https://github.com/apache/beam/pull/5487 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactory.java similarity index 92% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactory.java index 7ce80f69153..f4069f646b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultRowTypeFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactory.java @@ -29,7 +29,7 @@ import org.joda.time.DateTime; /** - * A default implementation of the {@link RowTypeFactory} interface. The purpose of + * A default implementation of the {@link SchemaFactory} interface. The purpose of * the factory is to create a row types given a list of getters. * * <p>Row type is represented by {@link Schema} which essentially is a @@ -42,10 +42,10 @@ * <p>This is the default factory implementation used in {@link RowFactory}. * * <p>In other cases, when mapping requires extra logic, another implentation of the - * {@link RowTypeFactory} should be used instead of this class. + * {@link SchemaFactory} should be used instead of this class. * */ -public class DefaultRowTypeFactory implements RowTypeFactory { +public class DefaultSchemaFactory implements SchemaFactory { private static final ImmutableMap<Class, TypeName> SUPPORTED_TYPES = ImmutableMap.<Class, TypeName>builder() .put(Boolean.class, TypeName.BOOLEAN) @@ -74,7 +74,7 @@ * Uses {@link CoderRegistry#createDefault()} to get coders for {@link FieldValueGetter#type()}. */ @Override - public Schema createRowType(Iterable<FieldValueGetter> fieldValueGetters) { + public Schema createSchema(Iterable<FieldValueGetter> fieldValueGetters) { List<Schema.Field> fields = Lists.newArrayList(); for (FieldValueGetter getter : fieldValueGetters) { fields.add(Schema.Field.of(getter.name(), getTypeDescriptor(getter.type()))); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java index 1dc91aa6a9e..4ec3fafce62 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/InferredRowCoder.java @@ -67,10 +67,10 @@ } /** - * Returns a {@link InferredRowCoder} with row type factory overridden by {@code rowTypeFactory}. + * Returns a {@link InferredRowCoder} with row type factory overridden by {@code schemaFactory}. */ - public InferredRowCoder<T> withRowTypeFactory(RowTypeFactory rowTypeFactory) { - return toBuilder().setRowFactory(RowFactory.withRowTypeFactory(rowTypeFactory)).build(); + public InferredRowCoder<T> withSchemaFactory(SchemaFactory schemaFactory) { + return toBuilder().setRowFactory(RowFactory.withSchemaFactory(schemaFactory)).build(); } static <W> Builder<W> builder() { @@ -79,12 +79,12 @@ abstract Builder<T> toBuilder(); - public Schema rowType() { - return rowFactory().getRowType(elementType()); + public Schema schema() { + return rowFactory().getSchema(elementType()); } public RowCoder rowCoder() { - return rowType().getRowCoder(); + return schema().getRowCoder(); } public Row createRow(T element) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java index 3e504e71614..e3bfd3cd009 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowFactory.java @@ -37,39 +37,39 @@ * * <p>At the moment single pojo class corresponds to single {@link Schema}. * - * <p>Supported pojo getter types depend on types supported by the {@link RowTypeFactory}. - * See {@link DefaultRowTypeFactory} for default implementation. + * <p>Supported pojo getter types depend on types supported by the {@link SchemaFactory}. + * See {@link DefaultSchemaFactory} for default implementation. */ @Internal public class RowFactory implements Serializable { - private RowTypeFactory rowTypeFactory; + private SchemaFactory schemaFactory; private final List<GetterFactory> getterFactories; - private transient Map<Class, RowTypeGetters> rowTypesCache; + private transient Map<Class, SchemaGetters> schemaCache; /** - * Creates an instance of {@link RowFactory} using {@link DefaultRowTypeFactory} + * Creates an instance of {@link RowFactory} using {@link DefaultSchemaFactory} * and {@link GeneratedGetterFactory}. */ public static RowFactory createDefault() { - return withRowTypeFactory(new DefaultRowTypeFactory()); + return withSchemaFactory(new DefaultSchemaFactory()); } /** - * Creates an instance of {@link RowFactory} using provided {@link RowTypeFactory} + * Creates an instance of {@link RowFactory} using provided {@link SchemaFactory} * and {@link GeneratedGetterFactory}. */ - public static RowFactory withRowTypeFactory(RowTypeFactory rowTypeFactory) { - return of(rowTypeFactory, new GeneratedGetterFactory()); + public static RowFactory withSchemaFactory(SchemaFactory schemaFactory) { + return of(schemaFactory, new GeneratedGetterFactory()); } /** - * Creates an instance of {@link RowFactory} using provided {@link RowTypeFactory} + * Creates an instance of {@link RowFactory} using provided {@link SchemaFactory} * and {@link GetterFactory}. */ - public static RowFactory of(RowTypeFactory rowTypeFactory, GetterFactory getterFactory) { - return new RowFactory(rowTypeFactory, getterFactory); + public static RowFactory of(SchemaFactory schemaFactory, GetterFactory getterFactory) { + return new RowFactory(schemaFactory, getterFactory); } /** @@ -77,13 +77,13 @@ public static RowFactory of(RowTypeFactory rowTypeFactory, GetterFactory getterF * * <p>For example this can be used to create BeamRecordSqlTypes instead of {@link Schema}. */ - RowFactory(RowTypeFactory rowTypeFactory, GetterFactory ... getterFactories) { - this.rowTypeFactory = rowTypeFactory; + RowFactory(SchemaFactory schemaFactory, GetterFactory ... getterFactories) { + this.schemaFactory = schemaFactory; this.getterFactories = Arrays.asList(getterFactories); } - public <T> Schema getRowType(Class<T> elementType) { - return getRecordType(elementType).rowType(); + public <T> Schema getSchema(Class<T> elementType) { + return getRecordType(elementType).schema(); } /** @@ -98,25 +98,25 @@ public static RowFactory of(RowTypeFactory rowTypeFactory, GetterFactory getterF * For example record field 'name' will be generated for 'getName()' pojo method. */ public <T> Row create(T pojo) { - RowTypeGetters getters = getRecordType(pojo.getClass()); + SchemaGetters getters = getRecordType(pojo.getClass()); List<Object> fieldValues = getFieldValues(getters.valueGetters(), pojo); - return Row.withSchema(getters.rowType()).addValues(fieldValues).build(); + return Row.withSchema(getters.schema()).addValues(fieldValues).build(); } - private synchronized RowTypeGetters getRecordType(Class pojoClass) { - if (rowTypesCache == null) { - rowTypesCache = new HashMap<>(); + private synchronized SchemaGetters getRecordType(Class pojoClass) { + if (schemaCache == null) { + schemaCache = new HashMap<>(); } - if (rowTypesCache.containsKey(pojoClass)) { - return rowTypesCache.get(pojoClass); + if (schemaCache.containsKey(pojoClass)) { + return schemaCache.get(pojoClass); } List<FieldValueGetter> fieldValueGetters = createGetters(pojoClass); - Schema schema = rowTypeFactory.createRowType(fieldValueGetters); - rowTypesCache.put(pojoClass, new RowTypeGetters(schema, fieldValueGetters)); + Schema schema = schemaFactory.createSchema(fieldValueGetters); + schemaCache.put(pojoClass, new SchemaGetters(schema, fieldValueGetters)); - return rowTypesCache.get(pojoClass); + return schemaCache.get(pojoClass); } private List<FieldValueGetter> createGetters(Class pojoClass) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaFactory.java similarity index 89% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeFactory.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaFactory.java index dfb03305f47..efb02bafed0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaFactory.java @@ -30,15 +30,15 @@ * <p>Different implementations can have different ways of mapping getter types to coders. * For example Beam SQL uses custom mapping via java.sql.Types. * - * <p>Default implementation is {@link DefaultRowTypeFactory}. + * <p>Default implementation is {@link DefaultSchemaFactory}. * It returns instances of {@link Schema}, mapping {@link FieldValueGetter#type()} * to known coders. */ @Internal -public interface RowTypeFactory extends Serializable { +public interface SchemaFactory extends Serializable { /** * Create a {@link Schema} for the list of the pojo field getters. */ - Schema createRowType(Iterable<FieldValueGetter> getters); + Schema createSchema(Iterable<FieldValueGetter> getters); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaGetters.java similarity index 89% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaGetters.java index ccb71b44319..e70ce5f4971 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/RowTypeGetters.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/reflect/SchemaGetters.java @@ -28,11 +28,11 @@ * * <p>This is used in {@link RowFactory} to create instances of {@link Row}s. */ -class RowTypeGetters { +class SchemaGetters { private Schema schema; private List<FieldValueGetter> fieldValueGetters; - RowTypeGetters(Schema schema, List<FieldValueGetter> fieldValueGetters) { + SchemaGetters(Schema schema, List<FieldValueGetter> fieldValueGetters) { this.schema = schema; this.fieldValueGetters = fieldValueGetters; } @@ -40,13 +40,13 @@ /** * Returns a {@link Schema}. */ - Schema rowType() { + Schema schema() { return schema; } /** * Returns the list of {@link FieldValueGetter}s which - * were used to create {@link RowTypeGetters#rowType()}. + * were used to create {@link SchemaGetters#schema()}. */ List<FieldValueGetter> valueGetters() { return fieldValueGetters; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java index 69d655416a8..3bdb7e4c460 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/DefaultSchemaFactoryTest.java @@ -28,7 +28,7 @@ import org.junit.rules.ExpectedException; /** - * Unit tests for {@link DefaultRowTypeFactory}. + * Unit tests for {@link DefaultSchemaFactory}. */ public class DefaultSchemaFactoryTest { @@ -53,9 +53,9 @@ @Test public void testContainsCorrectFields() throws Exception { - DefaultRowTypeFactory factory = new DefaultRowTypeFactory(); + DefaultSchemaFactory factory = new DefaultSchemaFactory(); - Schema schema = factory.createRowType(GETTERS); + Schema schema = factory.createSchema(GETTERS); assertEquals(GETTERS.size(), schema.getFieldCount()); assertEquals( @@ -73,9 +73,9 @@ public void testContainsCorrectFields() throws Exception { public void testThrowsForUnsupportedTypes() throws Exception { thrown.expect(UnsupportedOperationException.class); - DefaultRowTypeFactory factory = new DefaultRowTypeFactory(); + DefaultSchemaFactory factory = new DefaultSchemaFactory(); - factory.createRowType( + factory.createSchema( Arrays.<FieldValueGetter>asList(getter("unsupportedGetter", UnsupportedClass.class))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java index e0ca6dfc988..e590735689e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/InferredRowCoderTest.java @@ -73,13 +73,13 @@ public String getName() { } @Test - public void testCreatesRowType() { + public void testCreatesSchema() { InferredRowCoder<PersonPojo> inferredCoder = InferredRowCoder.ofSerializable(PersonPojo.class); - Schema rowType = inferredCoder.rowType(); + Schema schema = inferredCoder.schema(); - assertEquals(2, rowType.getFieldCount()); + assertEquals(2, schema.getFieldCount()); assertThat( - rowType.getFields(), + schema.getFields(), containsInAnyOrder(PERSON_ROW_TYPE.getField(0), PERSON_ROW_TYPE.getField(1))); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java index 95e8fc68c1f..b9ca89380aa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/RowFactoryTest.java @@ -142,6 +142,6 @@ public void testCopiesValues() throws Exception { } private RowFactory newFactory() { - return new RowFactory(new DefaultRowTypeFactory(), getterFactory); + return new RowFactory(new DefaultSchemaFactory(), getterFactory); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java index aa64be200c2..f4782363cac 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/reflect/SchemaGettersTest.java @@ -26,7 +26,7 @@ import org.junit.Test; /** - * Unit tests for {@link RowTypeGetters}. + * Unit tests for {@link SchemaGetters}. */ public class SchemaGettersTest { @@ -35,9 +35,9 @@ public void testGetters() { Schema schema = Schema.builder().build(); List<FieldValueGetter> fieldValueGetters = emptyList(); - RowTypeGetters getters = new RowTypeGetters(schema, fieldValueGetters); + SchemaGetters getters = new SchemaGetters(schema, fieldValueGetters); - assertSame(schema, getters.rowType()); + assertSame(schema, getters.schema()); assertSame(fieldValueGetters, getters.valueGetters()); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java index 902ef935fc3..7ae03a6e102 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/RowSqlTypes.java @@ -153,7 +153,7 @@ public Builder withMapField( return this; } - /** Adds an ARRAY field with elements of {@code rowType}. */ + /** Adds an ARRAY field with elements of {@code schema}. */ public Builder withArrayField(String fieldName, Schema schema) { FieldType collectionElementType = FieldType.of(TypeName.ROW).withRowSchema(schema); builder.addField( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java index f53f0cbfaea..4032592d067 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java @@ -76,7 +76,7 @@ TypeName.DATETIME.type(), SqlTypeName.TIMESTAMP, TypeName.STRING.type(), SqlTypeName.VARCHAR); - /** Generate {@code BeamSqlRowType} from {@code RelDataType} which is used to create table. */ + /** Generate {@link Schema} from {@code RelDataType} which is used to create table. */ public static Schema toBeamSchema(RelDataType tableInfo) { return tableInfo .getFieldList() diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java index 2160e8e5fd8..694d4d32c17 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslArrayTest.java @@ -35,7 +35,7 @@ /** Tests for SQL arrays. */ public class BeamSqlDslArrayTest { - private static final Schema INPUT_ROW_TYPE = + private static final Schema INPUT_SCHEMA = RowSqlTypes.builder() .withIntegerField("f_int") .withArrayField("f_stringArr", SqlTypeName.VARCHAR) @@ -109,12 +109,11 @@ public void testAccessArrayElement() { @Test public void testSingleElement() throws Exception { - Row inputRow = - Row.withSchema(INPUT_ROW_TYPE).addValues(1).addArray(Arrays.asList("111")).build(); + Row inputRow = Row.withSchema(INPUT_SCHEMA).addValues(1).addArray(Arrays.asList("111")).build(); PCollection<Row> input = PBegin.in(pipeline) - .apply("boundedInput1", Create.of(inputRow).withCoder(INPUT_ROW_TYPE.getRowCoder())); + .apply("boundedInput1", Create.of(inputRow).withCoder(INPUT_SCHEMA.getRowCoder())); Schema resultType = RowSqlTypes.builder().withVarcharField("f_arrElem").build(); @@ -146,7 +145,7 @@ public void testCardinality() { @Test public void testUnnestLiteral() { PCollection<Row> input = - PBegin.in(pipeline).apply("boundedInput1", Create.empty(INPUT_ROW_TYPE.getRowCoder())); + PBegin.in(pipeline).apply("boundedInput1", Create.empty(INPUT_SCHEMA.getRowCoder())); // Because we have a multi-part FROM the DSL considers it multi-input TupleTag<Row> mainTag = new TupleTag<Row>("main") {}; @@ -169,7 +168,7 @@ public void testUnnestLiteral() { @Test public void testUnnestNamedLiteral() { PCollection<Row> input = - PBegin.in(pipeline).apply("boundedInput1", Create.empty(INPUT_ROW_TYPE.getRowCoder())); + PBegin.in(pipeline).apply("boundedInput1", Create.empty(INPUT_SCHEMA.getRowCoder())); // Because we have a multi-part FROM the DSL considers it multi-input TupleTag<Row> mainTag = new TupleTag<Row>("main") {}; @@ -194,17 +193,17 @@ public void testUnnestNamedLiteral() { @Test public void testUnnestCrossJoin() { Row row1 = - Row.withSchema(INPUT_ROW_TYPE) + Row.withSchema(INPUT_SCHEMA) .addValues(42) .addArray(Arrays.asList("111", "222", "333")) .build(); Row row2 = - Row.withSchema(INPUT_ROW_TYPE).addValues(13).addArray(Arrays.asList("444", "555")).build(); + Row.withSchema(INPUT_SCHEMA).addValues(13).addArray(Arrays.asList("444", "555")).build(); PCollection<Row> input = PBegin.in(pipeline) - .apply("boundedInput1", Create.of(row1, row2).withCoder(INPUT_ROW_TYPE.getRowCoder())); + .apply("boundedInput1", Create.of(row1, row2).withCoder(INPUT_SCHEMA.getRowCoder())); // Because we have a multi-part FROM the DSL considers it multi-input TupleTag<Row> mainTag = new TupleTag<Row>("main") {}; @@ -233,16 +232,16 @@ public void testUnnestCrossJoin() { @Test public void testSelectRowsFromArrayOfRows() { - Schema elementRowType = + Schema elementSchema = RowSqlTypes.builder().withVarcharField("f_rowString").withIntegerField("f_rowInt").build(); - Schema resultRowType = - RowSqlTypes.builder().withArrayField("f_resultArray", elementRowType).build(); + Schema resultSchema = + RowSqlTypes.builder().withArrayField("f_resultArray", elementSchema).build(); Schema inputType = RowSqlTypes.builder() .withIntegerField("f_int") - .withArrayField("f_arrayOfRows", elementRowType) + .withArrayField("f_arrayOfRows", elementSchema) .build(); PCollection<Row> input = @@ -253,36 +252,36 @@ public void testSelectRowsFromArrayOfRows() { .addValues( 1, Arrays.asList( - Row.withSchema(elementRowType).addValues("AA", 11).build(), - Row.withSchema(elementRowType).addValues("BB", 22).build())) + Row.withSchema(elementSchema).addValues("AA", 11).build(), + Row.withSchema(elementSchema).addValues("BB", 22).build())) .build(), Row.withSchema(inputType) .addValues( 2, Arrays.asList( - Row.withSchema(elementRowType).addValues("CC", 33).build(), - Row.withSchema(elementRowType).addValues("DD", 44).build())) + Row.withSchema(elementSchema).addValues("CC", 33).build(), + Row.withSchema(elementSchema).addValues("DD", 44).build())) .build()) .withCoder(inputType.getRowCoder())); PCollection<Row> result = input .apply(BeamSql.query("SELECT f_arrayOfRows FROM PCOLLECTION")) - .setCoder(resultRowType.getRowCoder()); + .setCoder(resultSchema.getRowCoder()); PAssert.that(result) .containsInAnyOrder( - Row.withSchema(resultRowType) + Row.withSchema(resultSchema) .addArray( Arrays.asList( - Row.withSchema(elementRowType).addValues("AA", 11).build(), - Row.withSchema(elementRowType).addValues("BB", 22).build())) + Row.withSchema(elementSchema).addValues("AA", 11).build(), + Row.withSchema(elementSchema).addValues("BB", 22).build())) .build(), - Row.withSchema(resultRowType) + Row.withSchema(resultSchema) .addArray( Arrays.asList( - Row.withSchema(elementRowType).addValues("CC", 33).build(), - Row.withSchema(elementRowType).addValues("DD", 44).build())) + Row.withSchema(elementSchema).addValues("CC", 33).build(), + Row.withSchema(elementSchema).addValues("DD", 44).build())) .build()); pipeline.run(); @@ -290,15 +289,15 @@ public void testSelectRowsFromArrayOfRows() { @Test public void testSelectSingleRowFromArrayOfRows() { - Schema elementRowType = + Schema elementSchema = RowSqlTypes.builder().withVarcharField("f_rowString").withIntegerField("f_rowInt").build(); - Schema resultRowType = elementRowType; + Schema resultSchema = elementSchema; Schema inputType = RowSqlTypes.builder() .withIntegerField("f_int") - .withArrayField("f_arrayOfRows", elementRowType) + .withArrayField("f_arrayOfRows", elementSchema) .build(); PCollection<Row> input = @@ -309,42 +308,42 @@ public void testSelectSingleRowFromArrayOfRows() { .addValues( 1, Arrays.asList( - Row.withSchema(elementRowType).addValues("AA", 11).build(), - Row.withSchema(elementRowType).addValues("BB", 22).build())) + Row.withSchema(elementSchema).addValues("AA", 11).build(), + Row.withSchema(elementSchema).addValues("BB", 22).build())) .build(), Row.withSchema(inputType) .addValues( 2, Arrays.asList( - Row.withSchema(elementRowType).addValues("CC", 33).build(), - Row.withSchema(elementRowType).addValues("DD", 44).build())) + Row.withSchema(elementSchema).addValues("CC", 33).build(), + Row.withSchema(elementSchema).addValues("DD", 44).build())) .build()) .withCoder(inputType.getRowCoder())); PCollection<Row> result = input .apply(BeamSql.query("SELECT f_arrayOfRows[1] FROM PCOLLECTION")) - .setCoder(resultRowType.getRowCoder()); + .setCoder(resultSchema.getRowCoder()); PAssert.that(result) .containsInAnyOrder( - Row.withSchema(elementRowType).addValues("BB", 22).build(), - Row.withSchema(elementRowType).addValues("DD", 44).build()); + Row.withSchema(elementSchema).addValues("BB", 22).build(), + Row.withSchema(elementSchema).addValues("DD", 44).build()); pipeline.run(); } @Test public void testSelectRowFieldFromArrayOfRows() { - Schema elementRowType = + Schema elementSchema = RowSqlTypes.builder().withVarcharField("f_rowString").withIntegerField("f_rowInt").build(); - Schema resultRowType = RowSqlTypes.builder().withVarcharField("f_stringField").build(); + Schema resultSchema = RowSqlTypes.builder().withVarcharField("f_stringField").build(); Schema inputType = RowSqlTypes.builder() .withIntegerField("f_int") - .withArrayField("f_arrayOfRows", elementRowType) + .withArrayField("f_arrayOfRows", elementSchema) .build(); PCollection<Row> input = @@ -355,27 +354,27 @@ public void testSelectRowFieldFromArrayOfRows() { .addValues( 1, Arrays.asList( - Row.withSchema(elementRowType).addValues("AA", 11).build(), - Row.withSchema(elementRowType).addValues("BB", 22).build())) + Row.withSchema(elementSchema).addValues("AA", 11).build(), + Row.withSchema(elementSchema).addValues("BB", 22).build())) .build(), Row.withSchema(inputType) .addValues( 2, Arrays.asList( - Row.withSchema(elementRowType).addValues("CC", 33).build(), - Row.withSchema(elementRowType).addValues("DD", 44).build())) + Row.withSchema(elementSchema).addValues("CC", 33).build(), + Row.withSchema(elementSchema).addValues("DD", 44).build())) .build()) .withCoder(inputType.getRowCoder())); PCollection<Row> result = input .apply(BeamSql.query("SELECT f_arrayOfRows[1].f_rowString FROM PCOLLECTION")) - .setCoder(resultRowType.getRowCoder()); + .setCoder(resultSchema.getRowCoder()); PAssert.that(result) .containsInAnyOrder( - Row.withSchema(resultRowType).addValues("BB").build(), - Row.withSchema(resultRowType).addValues("DD").build()); + Row.withSchema(resultSchema).addValues("BB").build(), + Row.withSchema(resultSchema).addValues("DD").build()); pipeline.run(); } @@ -385,14 +384,14 @@ public void testSelectRowFieldFromArrayOfRows() { .apply( "boundedInput1", Create.of( - Row.withSchema(INPUT_ROW_TYPE) + Row.withSchema(INPUT_SCHEMA) .addValues(1) .addArray(Arrays.asList("111", "222")) .build(), - Row.withSchema(INPUT_ROW_TYPE) + Row.withSchema(INPUT_SCHEMA) .addValues(2) .addArray(Arrays.asList("33", "44", "55")) .build()) - .withCoder(INPUT_ROW_TYPE.getRowCoder())); + .withCoder(INPUT_SCHEMA.getRowCoder())); } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java index a8055a4d339..8cc54b33331 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/SqlSchemaFactoryTest.java @@ -26,9 +26,9 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.values.reflect.DefaultRowTypeFactory; +import org.apache.beam.sdk.values.reflect.DefaultSchemaFactory; import org.apache.beam.sdk.values.reflect.FieldValueGetter; -import org.apache.beam.sdk.values.reflect.RowTypeFactory; +import org.apache.beam.sdk.values.reflect.SchemaFactory; import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; @@ -56,9 +56,9 @@ @Test public void testContainsCorrectFields() throws Exception { - RowTypeFactory factory = new DefaultRowTypeFactory(); + SchemaFactory factory = new DefaultSchemaFactory(); - Schema schema = factory.createRowType(GETTERS_FOR_KNOWN_TYPES); + Schema schema = factory.createSchema(GETTERS_FOR_KNOWN_TYPES); assertEquals(GETTERS_FOR_KNOWN_TYPES.size(), schema.getFieldCount()); assertEquals( @@ -81,9 +81,9 @@ public void testContainsCorrectFields() throws Exception { public void testThrowsForUnsupportedTypes() throws Exception { thrown.expect(UnsupportedOperationException.class); - RowTypeFactory factory = new DefaultRowTypeFactory(); + SchemaFactory factory = new DefaultSchemaFactory(); - factory.createRowType( + factory.createSchema( Arrays.<FieldValueGetter>asList(getter("arrayListGetter", ArrayList.class))); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java index 1b14f8a2028..b92615733d0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/TestUtils.java @@ -103,7 +103,7 @@ public static RowsBuilder rowsBuilderOf(Schema type) { * @args pairs of column type and column names. */ public static RowsBuilder of(final Object... args) { - Schema beamSQLSchema = buildBeamSqlRowType(args); + Schema beamSQLSchema = buildBeamSqlSchema(args); RowsBuilder builder = new RowsBuilder(); builder.type = beamSQLSchema; @@ -120,8 +120,6 @@ public static RowsBuilder of(final Object... args) { * schema * ) * }</pre> - * - * @beamSQLRowType the row type. */ public static RowsBuilder of(final Schema schema) { RowsBuilder builder = new RowsBuilder(); @@ -159,7 +157,7 @@ public RowsBuilder addRows(final List args) { } public PCollectionBuilder getPCollectionBuilder() { - return pCollectionBuilder().withRowType(type).withRows(rows); + return pCollectionBuilder().withSchema(type).withRows(rows); } } @@ -173,7 +171,7 @@ public static PCollectionBuilder pCollectionBuilder() { private String timestampField; private Pipeline pipeline; - public PCollectionBuilder withRowType(Schema type) { + public PCollectionBuilder withSchema(Schema type) { this.type = type; return this; } @@ -224,12 +222,12 @@ public PCollectionBuilder inPipeline(Pipeline pipeline) { } /** - * Convenient way to build a {@code BeamSqlRowType}. + * Convenient way to build a {@link Schema}. * * <p>e.g. * * <pre>{@code - * buildBeamSqlRowType( + * buildBeamSqlSchema( * SqlCoders.BIGINT, "order_id", * SqlCoders.INTEGER, "site_id", * SqlCoders.DOUBLE, "price", @@ -237,7 +235,7 @@ public PCollectionBuilder inPipeline(Pipeline pipeline) { * ) * }</pre> */ - public static Schema buildBeamSqlRowType(Object... args) { + public static Schema buildBeamSqlSchema(Object... args) { return Stream.iterate(0, i -> i + 2) .limit(args.length / 2) .map(i -> toRecordField(args, i)) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java index dbbd91805fb..bd62b3f8a8c 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java @@ -101,7 +101,7 @@ public static void prepare() { registerTable( "SITE_LKP", new SiteLookupTable( - TestUtils.buildBeamSqlRowType( + TestUtils.buildBeamSqlSchema( TypeName.INT32, "site_id", TypeName.STRING, "site_name"))); } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java index 2178b1c1f8e..11aee8a6e55 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/schema/transform/BeamAggregationTransformTest.java @@ -124,7 +124,7 @@ public void testCountPerElementBasic() throws ParseException { new BeamAggregationTransforms.AggregationAdaptor(aggCalls, inputSchema))) .setCoder(KvCoder.of(keyCoder, aggCoder)); - //4. flat KV to a single record + // 4. flat KV to a single record PCollection<Row> mergedStream = aggregatedStream.apply( "mergeRecord", @@ -132,13 +132,13 @@ public void testCountPerElementBasic() throws ParseException { new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); mergedStream.setCoder(outRecordCoder); - //assert function BeamAggregationTransform.AggregationGroupByKeyFn + // assert function BeamAggregationTransform.AggregationGroupByKeyFn PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); - //assert BeamAggregationTransform.AggregationCombineFn + // assert BeamAggregationTransform.AggregationCombineFn PAssert.that(aggregatedStream).containsInAnyOrder(prepareResultOfAggregationCombineFn()); - //assert BeamAggregationTransform.MergeAggregationRecord + // assert BeamAggregationTransform.MergeAggregationRecord PAssert.that(mergedStream).containsInAnyOrder(prepareResultOfMergeAggregationRow()); p.run(); @@ -152,7 +152,7 @@ private void setupEnvironment() { /** create list of all {@link AggregateCall}. */ @SuppressWarnings("deprecation") private void prepareAggregationCalls() { - //aggregations for all data type + // aggregations for all data type aggCalls = new ArrayList<>(); aggCalls.add( new AggregateCall( @@ -393,7 +393,7 @@ private void prepareTypeAndCoder() { aggCoder = aggPartType.getRowCoder(); - outputType = prepareFinalRowType(); + outputType = prepareFinalSchema(); outRecordCoder = outputType.getRowCoder(); } @@ -446,7 +446,7 @@ private void prepareTypeAndCoder() { } /** Row type of final output row. */ - private Schema prepareFinalRowType() { + private Schema prepareFinalSchema() { return RowSqlTypes.builder() .withIntegerField("f_int") .withBigIntField("count") diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java index 928a05cb398..040ffdacb04 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaCSVTableTest.java @@ -41,9 +41,9 @@ public class BeamKafkaCSVTableTest { @Rule public TestPipeline pipeline = TestPipeline.create(); - private static final Row ROW1 = Row.withSchema(genRowType()).addValues(1L, 1, 1.0).build(); + private static final Row ROW1 = Row.withSchema(genSchema()).addValues(1L, 1, 1.0).build(); - private static final Row ROW2 = Row.withSchema(genRowType()).addValues(2L, 2, 2.0).build(); + private static final Row ROW2 = Row.withSchema(genSchema()).addValues(2L, 2, 2.0).build(); @Test public void testCsvRecorderDecoder() throws Exception { @@ -51,7 +51,7 @@ public void testCsvRecorderDecoder() throws Exception { pipeline .apply(Create.of("1,\"1\",1.0", "2,2,2.0")) .apply(ParDo.of(new String2KvBytes())) - .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)); + .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), CSVFormat.DEFAULT)); PAssert.that(result).containsInAnyOrder(ROW1, ROW2); @@ -63,15 +63,15 @@ public void testCsvRecorderEncoder() throws Exception { PCollection<Row> result = pipeline .apply(Create.of(ROW1, ROW2)) - .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genRowType(), CSVFormat.DEFAULT)) - .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genRowType(), CSVFormat.DEFAULT)); + .apply(new BeamKafkaCSVTable.CsvRecorderEncoder(genSchema(), CSVFormat.DEFAULT)) + .apply(new BeamKafkaCSVTable.CsvRecorderDecoder(genSchema(), CSVFormat.DEFAULT)); PAssert.that(result).containsInAnyOrder(ROW1, ROW2); pipeline.run(); } - private static Schema genRowType() { + private static Schema genSchema() { JavaTypeFactory typeFactory = new JavaTypeFactoryImpl(RelDataTypeSystem.DEFAULT); return CalciteUtils.toBeamSchema( typeFactory diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java index 4f94177e9d1..6a7384a9460 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedBoundedTable.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.mock; -import static org.apache.beam.sdk.extensions.sql.TestUtils.buildBeamSqlRowType; +import static org.apache.beam.sdk.extensions.sql.TestUtils.buildBeamSqlSchema; import static org.apache.beam.sdk.extensions.sql.TestUtils.buildRows; import java.util.ArrayList; @@ -62,7 +62,7 @@ public MockedBoundedTable(Schema beamSchema) { * }</pre> */ public static MockedBoundedTable of(final Object... args) { - return new MockedBoundedTable(buildBeamSqlRowType(args)); + return new MockedBoundedTable(buildBeamSqlSchema(args)); } /** Build a mocked bounded table with the specified type. */ diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java index 295accbbdc9..2bf81a184a6 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/mock/MockedUnboundedTable.java @@ -58,7 +58,7 @@ private MockedUnboundedTable(Schema beamSchema) { * }</pre> */ public static MockedUnboundedTable of(final Object... args) { - return new MockedUnboundedTable(TestUtils.buildBeamSqlRowType(args)); + return new MockedUnboundedTable(TestUtils.buildBeamSqlSchema(args)); } public MockedUnboundedTable timestampColumnIndex(int idx) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 107214) Time Spent: 50m (was: 40m) > Find remaining uses of rowType and RowType, etc, and make them Schema as > appropriate > ------------------------------------------------------------------------------------ > > Key: BEAM-4084 > URL: https://issues.apache.org/jira/browse/BEAM-4084 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core > Reporter: Kenneth Knowles > Assignee: Kenneth Knowles > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)