Repository: beam Updated Branches: refs/heads/DSL_SQL dedabff1f -> 9395fbb3c
http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index d4e1db2..4795b2c 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -18,7 +18,8 @@ package org.apache.beam.dsls.sql.rel; -import org.apache.beam.dsls.sql.BeamSQLEnvironment; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.sdk.testing.PAssert; @@ -36,7 +37,6 @@ import org.junit.Test; public class BeamValuesRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - public static BeamSQLEnvironment runner = BeamSQLEnvironment.create(); private static MockedBeamSQLTable stringTable = MockedBeamSQLTable .of(SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description"); @@ -49,7 +49,7 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description", @@ -61,7 +61,7 @@ public class BeamValuesRelTest { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( SqlTypeName.INTEGER, "c0", SqlTypeName.INTEGER, "c1", @@ -73,7 +73,7 @@ public class BeamValuesRelTest { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline); + PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( SqlTypeName.INTEGER, "EXPR$0", SqlTypeName.CHAR, "EXPR$1", @@ -84,8 +84,8 @@ public class BeamValuesRelTest { @BeforeClass public static void prepareClass() { - runner.addTableMetadata("string_table", stringTable); - runner.addTableMetadata("int_table", intTable); + BeamSqlEnv.registerTable("string_table", stringTable); + BeamSqlEnv.registerTable("int_table", intTable); } @Before http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java index 6f24e2a..cb268bf 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.dsls.sql.schema; +import org.apache.beam.dsls.sql.BeamSqlCli; +import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.planner.BasePlanner; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.sdk.testing.TestPipeline; @@ -27,17 +29,19 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.BeforeClass; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; /** * Test case for BeamPCollectionTable. */ public class BeamPCollectionTableTest extends BasePlanner{ - public static TestPipeline pipeline = TestPipeline.create(); + @Rule + public final TestPipeline pipeline = TestPipeline.create(); - @BeforeClass - public static void prepareTable(){ + @Before + public void prepareTable(){ RelProtoDataType protoRowType = new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a0) { @@ -51,14 +55,16 @@ public class BeamPCollectionTableTest extends BasePlanner{ row.addField(0, 1); row.addField(1, "hello world."); PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row)); - runner.addTableMetadata("COLLECTION_TABLE", + BeamSqlEnv.registerTable("COLLECTION_TABLE", new BeamPCollectionTable(inputStream, protoRowType)); } @Test public void testSelectFromPCollectionTable() throws Exception{ String sql = "select c1, c2 from COLLECTION_TABLE"; - runner.executionPlan(sql); + PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); + + pipeline.run().waitUntilFinish(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index bc6343b..985b667 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -74,7 +74,7 @@ public class BeamSqlRowCoderTest { row.addField("col_timestamp", new Date()); - BeamSqlRowCoder coder = BeamSqlRowCoder.of(); + BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType); CoderProperties.coderDecodeEncodeEqual(coder, row); } } http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java index f174b9c..dadd53b 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java @@ -23,11 +23,11 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder; import org.apache.beam.dsls.sql.schema.BeamSQLRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; -import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine; @@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.sql.SqlKind; @@ -62,8 +61,15 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ public TestPipeline p = TestPipeline.create(); private List<AggregateCall> aggCalls; - private BeamSQLRecordType keyType = initTypeOfSqlRow( - Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); + + private BeamSQLRecordType keyType; + private BeamSQLRecordType aggPartType; + private BeamSQLRecordType outputType; + + private BeamSqlRowCoder inRecordCoder; + private BeamSqlRowCoder keyCoder; + private BeamSqlRowCoder aggCoder; + private BeamSqlRowCoder outRecordCoder; /** * This step equals to below query. @@ -97,21 +103,25 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ //1. extract fields in group-by key part PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = input.apply("exGroupBy", WithKeys - .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))); + .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) + .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, inRecordCoder)); //2. apply a GroupByKey. PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create()); + .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create()) + .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder, + IterableCoder.<BeamSQLRow>of(inRecordCoder))); //3. run aggregation functions PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation", Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues( - new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType))); + new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType))) + .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder)); //4. flat KV to a single record PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - BeamSQLRecordType.from(prepareFinalRowType()), aggCalls))); + ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls))); + mergedStream.setCoder(outRecordCoder); //assert function BeamAggregationTransform.AggregationGroupByKeyFn PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn()); @@ -126,17 +136,8 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ } private void setupEnvironment() { - regiesterCoder(); prepareAggregationCalls(); - } - - /** - * Add Coders in BeamSQL. - */ - private void regiesterCoder() { - CoderRegistry cr = p.getCoderRegistry(); - cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of()); - cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of()); + prepareTypeAndCoder(); } /** @@ -327,26 +328,15 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ } /** - * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. + * Coders used in aggregation steps. */ - private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() { - return Arrays.asList( - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - inputRows.get(0)), - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), - inputRows.get(1)), - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), - inputRows.get(2)), - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), - inputRows.get(3))); - } + private void prepareTypeAndCoder() { + inRecordCoder = new BeamSqlRowCoder(inputRowType); - /** - * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. - */ - private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn() - throws ParseException { - BeamSQLRecordType aggPartType = initTypeOfSqlRow( + keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER))); + keyCoder = new BeamSqlRowCoder(keyType); + + aggPartType = initTypeOfSqlRow( Arrays.asList(KV.of("count", SqlTypeName.BIGINT), KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", SqlTypeName.BIGINT), @@ -369,6 +359,32 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", SqlTypeName.INTEGER), KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", SqlTypeName.INTEGER) )); + aggCoder = new BeamSqlRowCoder(aggPartType); + + outputType = prepareFinalRowType(); + outRecordCoder = new BeamSqlRowCoder(outputType); + } + + /** + * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. + */ + private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() { + return Arrays.asList( + KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + inputRows.get(0)), + KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), + inputRows.get(1)), + KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), + inputRows.get(2)), + KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), + inputRows.get(3))); + } + + /** + * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. + */ + private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn() + throws ParseException { return Arrays.asList( KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), new BeamSQLRow(aggPartType, Arrays.<Object>asList( @@ -387,7 +403,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ /** * Row type of final output row. */ - private RelDataType prepareFinalRowType() { + private BeamSQLRecordType prepareFinalRowType() { FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), @@ -415,14 +431,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return builder.build(); + return BeamSQLRecordType.from(builder.build()); } /** * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. */ private BeamSQLRow prepareResultOfMergeAggregationRecord() throws ParseException { - return new BeamSQLRow(BeamSQLRecordType.from(prepareFinalRowType()), Arrays.<Object>asList( + return new BeamSQLRow(outputType, Arrays.<Object>asList( 1, 4L, 10000L, 2500L, 4000L, 1000L, (short) 10, (short) 2, (short) 4, (short) 1,
