Repository: beam Updated Branches: refs/heads/DSL_SQL 5c1f2cbc6 -> e68badd4d
http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java index 8a48618..4df7f8a 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java @@ -18,7 +18,7 @@ package org.apache.beam.dsls.sql.planner; import org.apache.beam.dsls.sql.BeamSqlCli; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; import org.junit.Assert; @@ -36,7 +36,7 @@ public class BeamPlannerSubmitTest extends BasePlanner { @Before public void prepare() { - MockedBeamSQLTable.CONTENT.clear(); + MockedBeamSqlTable.CONTENT.clear(); } @Test @@ -45,12 +45,12 @@ public class BeamPlannerSubmitTest extends BasePlanner { + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20"; - PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); - Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1); - Assert.assertTrue(MockedBeamSQLTable.CONTENT.peek().valueInString() + Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1); + Assert.assertTrue(MockedBeamSqlTable.CONTENT.peek().valueInString() .contains("order_id=12345,site_id=0,price=20.5,order_time=")); } } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java deleted file mode 100644 index 561f4be..0000000 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.dsls.sql.planner; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; - -import org.apache.beam.dsls.sql.schema.BaseBeamTable; -import org.apache.beam.dsls.sql.schema.BeamIOType; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelProtoDataType; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * A mock table use to check input/output. - * - */ -public class MockedBeamSQLTable extends BaseBeamTable { - - public static final ConcurrentLinkedQueue<BeamSQLRow> CONTENT = new ConcurrentLinkedQueue<>(); - - private List<BeamSQLRow> inputRecords; - - public MockedBeamSQLTable(RelProtoDataType protoRowType) { - super(protoRowType); - } - - public MockedBeamSQLTable withInputRecords(List<BeamSQLRow> inputRecords){ - this.inputRecords = inputRecords; - return this; - } - - /** - * Convenient way to build a mocked table with mock data: - * - * <p>e.g. - * - * <pre>{@code - * MockedBeamSQLTable - * .of(SqlTypeName.BIGINT, "order_id", - * SqlTypeName.INTEGER, "site_id", - * SqlTypeName.DOUBLE, "price", - * SqlTypeName.TIMESTAMP, "order_time", - * - * 1L, 2, 1.0, new Date(), - * 1L, 1, 2.0, new Date(), - * 2L, 4, 3.0, new Date(), - * 2L, 1, 4.0, new Date(), - * 5L, 5, 5.0, new Date(), - * 6L, 6, 6.0, new Date(), - * 7L, 7, 7.0, new Date(), - * 8L, 8888, 8.0, new Date(), - * 8L, 999, 9.0, new Date(), - * 10L, 100, 10.0, new Date()) - * }</pre> - */ - public static MockedBeamSQLTable of(final Object... args){ - final RelProtoDataType protoRowType = new RelProtoDataType() { - @Override - public RelDataType apply(RelDataTypeFactory a0) { - RelDataTypeFactory.FieldInfoBuilder builder = a0.builder(); - - int lastTypeIndex = 0; - for (; lastTypeIndex < args.length; lastTypeIndex += 2) { - if (args[lastTypeIndex] instanceof SqlTypeName) { - builder.add(args[lastTypeIndex + 1].toString(), - (SqlTypeName) args[lastTypeIndex]); - } else { - break; - } - } - return builder.build(); - } - }; - - List<BeamSQLRow> rows = new ArrayList<>(); - BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from( - protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); - int fieldCount = beamSQLRecordType.size(); - - for (int i = fieldCount * 2; i < args.length; i += fieldCount) { - BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); - for (int j = 0; j < fieldCount; j++) { - row.addField(j, args[i + j]); - } - rows.add(row); - } - return new MockedBeamSQLTable(protoRowType).withInputRecords(rows); - } - - @Override - public BeamIOType getSourceType() { - return BeamIOType.UNBOUNDED; - } - - @Override - public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) { - return PBegin.in(pipeline).apply(Create.of(inputRecords)); - } - - @Override - public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() { - return new OutputStore(); - } - - public List<BeamSQLRow> getInputRecords() { - return inputRecords; - } - - /** - * Keep output in {@code CONTENT} for validation. - * - */ - public static class OutputStore extends PTransform<PCollection<BeamSQLRow>, PDone> { - - @Override - public PDone expand(PCollection<BeamSQLRow> input) { - input.apply(ParDo.of(new DoFn<BeamSQLRow, Void>() { - @ProcessElement - public void processElement(ProcessContext c) { - CONTENT.add(c.element()); - } - - @Teardown - public void close() { - - } - - })); - return PDone.in(input.getPipeline()); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java new file mode 100644 index 0000000..2ff042d --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSqlTable.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.planner; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.beam.dsls.sql.schema.BaseBeamTable; +import org.apache.beam.dsls.sql.schema.BeamIOType; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * A mock table use to check input/output. + * + */ +public class MockedBeamSqlTable extends BaseBeamTable { + + public static final ConcurrentLinkedQueue<BeamSqlRow> CONTENT = new ConcurrentLinkedQueue<>(); + + private List<BeamSqlRow> inputRecords; + + public MockedBeamSqlTable(RelProtoDataType protoRowType) { + super(protoRowType); + } + + public MockedBeamSqlTable withInputRecords(List<BeamSqlRow> inputRecords){ + this.inputRecords = inputRecords; + return this; + } + + /** + * Convenient way to build a mocked table with mock data: + * + * <p>e.g. + * + * <pre>{@code + * MockedBeamSqlTable + * .of(SqlTypeName.BIGINT, "order_id", + * SqlTypeName.INTEGER, "site_id", + * SqlTypeName.DOUBLE, "price", + * SqlTypeName.TIMESTAMP, "order_time", + * + * 1L, 2, 1.0, new Date(), + * 1L, 1, 2.0, new Date(), + * 2L, 4, 3.0, new Date(), + * 2L, 1, 4.0, new Date(), + * 5L, 5, 5.0, new Date(), + * 6L, 6, 6.0, new Date(), + * 7L, 7, 7.0, new Date(), + * 8L, 8888, 8.0, new Date(), + * 8L, 999, 9.0, new Date(), + * 10L, 100, 10.0, new Date()) + * }</pre> + */ + public static MockedBeamSqlTable of(final Object... args){ + final RelProtoDataType protoRowType = new RelProtoDataType() { + @Override + public RelDataType apply(RelDataTypeFactory a0) { + RelDataTypeFactory.FieldInfoBuilder builder = a0.builder(); + + int lastTypeIndex = 0; + for (; lastTypeIndex < args.length; lastTypeIndex += 2) { + if (args[lastTypeIndex] instanceof SqlTypeName) { + builder.add(args[lastTypeIndex + 1].toString(), + (SqlTypeName) args[lastTypeIndex]); + } else { + break; + } + } + return builder.build(); + } + }; + + List<BeamSqlRow> rows = new ArrayList<>(); + BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from( + protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY)); + int fieldCount = beamSQLRecordType.size(); + + for (int i = fieldCount * 2; i < args.length; i += fieldCount) { + BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); + for (int j = 0; j < fieldCount; j++) { + row.addField(j, args[i + j]); + } + rows.add(row); + } + return new MockedBeamSqlTable(protoRowType).withInputRecords(rows); + } + + @Override + public BeamIOType getSourceType() { + return BeamIOType.UNBOUNDED; + } + + @Override + public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) { + return PBegin.in(pipeline).apply(Create.of(inputRecords)); + } + + @Override + public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() { + return new OutputStore(); + } + + public List<BeamSqlRow> getInputRecords() { + return inputRecords; + } + + /** + * Keep output in {@code CONTENT} for validation. + * + */ + public static class OutputStore extends PTransform<PCollection<BeamSqlRow>, PDone> { + + @Override + public PDone expand(PCollection<BeamSqlRow> input) { + input.apply(ParDo.of(new DoFn<BeamSqlRow, Void>() { + @ProcessElement + public void processElement(ProcessContext c) { + CONTENT.add(c.element()); + } + + @Teardown + public void close() { + + } + + })); + return PDone.in(input.getPipeline()); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java index a44b0d9..6667b46 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java @@ -25,8 +25,8 @@ import java.util.Iterator; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException; -import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.Assert; @@ -41,12 +41,12 @@ public class BeamSortRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSQLTable subOrderRamTable = MockedBeamSQLTable.of( + private static MockedBeamSqlTable subOrderRamTable = MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price"); - private static MockedBeamSQLTable orderDetailTable = MockedBeamSQLTable + private static MockedBeamSqlTable orderDetailTable = MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -75,7 +75,7 @@ public class BeamSortRelTest { pipeline.run().waitUntilFinish(); assertEquals( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -83,12 +83,12 @@ public class BeamSortRelTest { 1L, 1, 2.0, 2L, 4, 3.0, 2L, 1, 4.0 - ).getInputRecords(), MockedBeamSQLTable.CONTENT); + ).getInputRecords(), MockedBeamSqlTable.CONTENT); } @Test public void testOrderBy_nullsFirst() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable + BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -98,7 +98,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable + BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -112,7 +112,7 @@ public class BeamSortRelTest { pipeline.run().waitUntilFinish(); assertEquals( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -121,12 +121,12 @@ public class BeamSortRelTest { 1L, 2, 1.0, 2L, null, 4.0, 2L, 1, 3.0 - ).getInputRecords(), MockedBeamSQLTable.CONTENT); + ).getInputRecords(), MockedBeamSqlTable.CONTENT); } @Test public void testOrderBy_nullsLast() throws Exception { - BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSQLTable + BeamSqlEnv.registerTable("ORDER_DETAILS", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -136,7 +136,7 @@ public class BeamSortRelTest { 2L, 1, 3.0, 2L, null, 4.0, 5L, 5, 5.0)); - BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSQLTable + BeamSqlEnv.registerTable("SUB_ORDER_RAM", MockedBeamSqlTable .of(SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price")); @@ -150,7 +150,7 @@ public class BeamSortRelTest { pipeline.run().waitUntilFinish(); assertEquals( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -159,7 +159,7 @@ public class BeamSortRelTest { 1L, null, 2.0, 2L, 1, 3.0, 2L, null, 4.0 - ).getInputRecords(), MockedBeamSQLTable.CONTENT); + ).getInputRecords(), MockedBeamSqlTable.CONTENT); } @Test @@ -173,7 +173,7 @@ public class BeamSortRelTest { pipeline.run().waitUntilFinish(); assertEquals( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -182,7 +182,7 @@ public class BeamSortRelTest { 6L, 6, 6.0, 7L, 7, 7.0, 8L, 8888, 8.0 - ).getInputRecords(), MockedBeamSQLTable.CONTENT); + ).getInputRecords(), MockedBeamSqlTable.CONTENT); } @Test @@ -196,7 +196,7 @@ public class BeamSortRelTest { pipeline.run().waitUntilFinish(); assertEquals( - MockedBeamSQLTable.of( + MockedBeamSqlTable.of( SqlTypeName.BIGINT, "order_id", SqlTypeName.INTEGER, "site_id", SqlTypeName.DOUBLE, "price", @@ -211,7 +211,7 @@ public class BeamSortRelTest { 8L, 8888, 8.0, 8L, 999, 9.0, 10L, 100, 10.0 - ).getInputRecords(), MockedBeamSQLTable.CONTENT); + ).getInputRecords(), MockedBeamSqlTable.CONTENT); } @Test(expected = BeamSqlUnsupportedException.class) @@ -230,14 +230,14 @@ public class BeamSortRelTest { public void prepare() { BeamSqlEnv.registerTable("ORDER_DETAILS", orderDetailTable); BeamSqlEnv.registerTable("SUB_ORDER_RAM", subOrderRamTable); - MockedBeamSQLTable.CONTENT.clear(); + MockedBeamSqlTable.CONTENT.clear(); } - private void assertEquals(Collection<BeamSQLRow> rows1, Collection<BeamSQLRow> rows2) { + private void assertEquals(Collection<BeamSqlRow> rows1, Collection<BeamSqlRow> rows2) { Assert.assertEquals(rows1.size(), rows2.size()); - Iterator<BeamSQLRow> it1 = rows1.iterator(); - Iterator<BeamSQLRow> it2 = rows2.iterator(); + Iterator<BeamSqlRow> it1 = rows1.iterator(); + Iterator<BeamSqlRow> it2 = rows2.iterator(); while (it1.hasNext()) { Assert.assertEquals(it1.next(), it2.next()); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java index 4795b2c..4557e8e 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java @@ -20,8 +20,8 @@ package org.apache.beam.dsls.sql.rel; import org.apache.beam.dsls.sql.BeamSqlCli; import org.apache.beam.dsls.sql.BeamSqlEnv; -import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.planner.MockedBeamSqlTable; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -37,11 +37,11 @@ import org.junit.Test; public class BeamValuesRelTest { @Rule public final TestPipeline pipeline = TestPipeline.create(); - private static MockedBeamSQLTable stringTable = MockedBeamSQLTable + private static MockedBeamSqlTable stringTable = MockedBeamSqlTable .of(SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description"); - private static MockedBeamSQLTable intTable = MockedBeamSQLTable + private static MockedBeamSqlTable intTable = MockedBeamSqlTable .of(SqlTypeName.INTEGER, "c0", SqlTypeName.INTEGER, "c1"); @@ -49,8 +49,8 @@ public class BeamValuesRelTest { public void testValues() throws Exception { String sql = "insert into string_table(name, description) values " + "('hello', 'world'), ('james', 'bond')"; - PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); - PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.VARCHAR, "name", SqlTypeName.VARCHAR, "description", "hello", "world", @@ -61,8 +61,8 @@ public class BeamValuesRelTest { @Test public void testValues_castInt() throws Exception { String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 as int))"; - PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); - PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "c0", SqlTypeName.INTEGER, "c1", 1, 2 @@ -73,8 +73,8 @@ public class BeamValuesRelTest { @Test public void testValues_onlySelect() throws Exception { String sql = "select 1, '1'"; - PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); - PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of( + PCollection<BeamSqlRow> rows = BeamSqlCli.compilePipeline(sql, pipeline); + PAssert.that(rows).containsInAnyOrder(MockedBeamSqlTable.of( SqlTypeName.INTEGER, "EXPR$0", SqlTypeName.CHAR, "EXPR$1", 1, "1" @@ -90,6 +90,6 @@ public class BeamValuesRelTest { @Before public void prepare() { - MockedBeamSQLTable.CONTENT.clear(); + MockedBeamSqlTable.CONTENT.clear(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java index cb268bf..a085eae 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java @@ -50,11 +50,11 @@ public class BeamPCollectionTableTest extends BasePlanner{ } }; - BeamSQLRow row = new BeamSQLRow(BeamSQLRecordType.from( + BeamSqlRow row = new BeamSqlRow(BeamSqlRecordType.from( protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY))); row.addField(0, 1); row.addField(1, "hello world."); - PCollection<BeamSQLRow> inputStream = PBegin.in(pipeline).apply(Create.of(row)); + PCollection<BeamSqlRow> inputStream = PBegin.in(pipeline).apply(Create.of(row)); BeamSqlEnv.registerTable("COLLECTION_TABLE", new BeamPCollectionTable(inputStream, protoRowType)); } @@ -62,7 +62,7 @@ public class BeamPCollectionTableTest extends BasePlanner{ @Test public void testSelectFromPCollectionTable() throws Exception{ String sql = "select c1, c2 from COLLECTION_TABLE"; - PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); + PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, pipeline); pipeline.run().waitUntilFinish(); } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java index 985b667..c087825 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java @@ -56,10 +56,10 @@ public class BeamSqlRowCoderTest { } }; - BeamSQLRecordType beamSQLRecordType = BeamSQLRecordType.from( + BeamSqlRecordType beamSQLRecordType = BeamSqlRecordType.from( protoRowType.apply(new JavaTypeFactoryImpl( RelDataTypeSystem.DEFAULT))); - BeamSQLRow row = new BeamSQLRow(beamSQLRecordType); + BeamSqlRow row = new BeamSqlRow(beamSQLRecordType); row.addField("col_tinyint", Byte.valueOf("1")); row.addField("col_smallint", Short.valueOf("1")); row.addField("col_integer", 1); http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java index d20af0c..fc19d40 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTableTest.java @@ -21,8 +21,8 @@ package org.apache.beam.dsls.sql.schema.kafka; import java.io.Serializable; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -45,8 +45,8 @@ import org.junit.Test; public class BeamKafkaCSVTableTest { @Rule public TestPipeline pipeline = TestPipeline.create(); - public static BeamSQLRow row1 = new BeamSQLRow(genRowType()); - public static BeamSQLRow row2 = new BeamSQLRow(genRowType()); + public static BeamSqlRow row1 = new BeamSqlRow(genRowType()); + public static BeamSqlRow row2 = new BeamSqlRow(genRowType()); @BeforeClass public static void setUp() { @@ -60,7 +60,7 @@ public class BeamKafkaCSVTableTest { } @Test public void testCsvRecorderDecoder() throws Exception { - PCollection<BeamSQLRow> result = pipeline + PCollection<BeamSqlRow> result = pipeline .apply( Create.of("1,\"1\",1.0", "2,2,2.0") ) @@ -75,7 +75,7 @@ public class BeamKafkaCSVTableTest { } @Test public void testCsvRecorderEncoder() throws Exception { - PCollection<BeamSQLRow> result = pipeline + PCollection<BeamSqlRow> result = pipeline .apply( Create.of(row1, row2) ) @@ -90,8 +90,8 @@ public class BeamKafkaCSVTableTest { pipeline.run(); } - private static BeamSQLRecordType genRowType() { - return BeamSQLRecordType.from( + private static BeamSqlRecordType genRowType() { + return BeamSqlRecordType.from( new RelProtoDataType() { @Override public RelDataType apply(RelDataTypeFactory a0) { return a0.builder() http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java index 4c403ac..d782aad 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableTest.java @@ -33,8 +33,8 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; @@ -69,7 +69,7 @@ public class BeamTextCSVTableTest { private static Object[] data2 = new Object[] { 2, 2L, 2.2F, 2.2, "bond" }; private static List<Object[]> testData = Arrays.asList(data1, data2); - private static List<BeamSQLRow> testDataRows = new ArrayList<BeamSQLRow>() {{ + private static List<BeamSqlRow> testDataRows = new ArrayList<BeamSqlRow>() {{ for (Object[] data : testData) { add(buildRow(data)); } @@ -80,7 +80,7 @@ public class BeamTextCSVTableTest { private static File writerTargetFile; @Test public void testBuildIOReader() { - PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(), + PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildRowType(), readerSourceFile.getAbsolutePath()).buildIOReader(pipeline); PAssert.that(rows).containsInAnyOrder(testDataRows); pipeline.run(); @@ -92,7 +92,7 @@ public class BeamTextCSVTableTest { .buildIOWriter()); pipeline.run(); - PCollection<BeamSQLRow> rows = new BeamTextCSVTable(buildRowType(), + PCollection<BeamSqlRow> rows = new BeamTextCSVTable(buildRowType(), writerTargetFile.getAbsolutePath()).buildIOReader(pipeline2); // confirm the two reads match @@ -165,11 +165,11 @@ public class BeamTextCSVTableTest { .add("amount", SqlTypeName.DOUBLE).add("user_name", SqlTypeName.VARCHAR).build(); } - private static BeamSQLRecordType buildBeamSQLRecordType() { - return BeamSQLRecordType.from(buildRelDataType()); + private static BeamSqlRecordType buildBeamSqlRecordType() { + return BeamSqlRecordType.from(buildRelDataType()); } - private static BeamSQLRow buildRow(Object[] data) { - return new BeamSQLRow(buildBeamSQLRecordType(), Arrays.asList(data)); + private static BeamSqlRow buildRow(Object[] data) { + return new BeamSqlRow(buildBeamSqlRecordType(), Arrays.asList(data)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java index dadd53b..5cbbe41 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java @@ -22,8 +22,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.sdk.coders.IterableCoder; @@ -62,9 +62,9 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ private List<AggregateCall> aggCalls; - private BeamSQLRecordType keyType; - private BeamSQLRecordType aggPartType; - private BeamSQLRecordType outputType; + private BeamSqlRecordType keyType; + private BeamSqlRecordType aggPartType; + private BeamSqlRecordType outputType; private BeamSqlRowCoder inRecordCoder; private BeamSqlRowCoder keyCoder; @@ -98,28 +98,28 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ public void testCountPerElementBasic() throws ParseException { setupEnvironment(); - PCollection<BeamSQLRow> input = p.apply(Create.of(inputRows)); + PCollection<BeamSqlRow> input = p.apply(Create.of(inputRows)); //1. extract fields in group-by key part - PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = input.apply("exGroupBy", + PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = input.apply("exGroupBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, ImmutableBitSet.of(0)))) - .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, inRecordCoder)); + .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, inRecordCoder)); //2. apply a GroupByKey. - PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = exGroupByStream - .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create()) - .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder, - IterableCoder.<BeamSQLRow>of(inRecordCoder))); + PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream + .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) + .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder, + IterableCoder.<BeamSqlRow>of(inRecordCoder))); //3. run aggregation functions - PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = groupedStream.apply("aggregation", - Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues( + PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply("aggregation", + Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( new BeamAggregationTransforms.AggregationCombineFn(aggCalls, inputRowType))) - .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder)); + .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); //4. flat KV to a single record - PCollection<BeamSQLRow> mergedStream = aggregatedStream.apply("mergeRecord", + PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls))); mergedStream.setCoder(outRecordCoder); @@ -368,26 +368,26 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ /** * expected results after {@link BeamAggregationTransforms.AggregationGroupByKeyFn}. */ - private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationGroupByKeyFn() { + private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationGroupByKeyFn() { return Arrays.asList( - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), inputRows.get(0)), - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), + KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(1).getInteger(0))), inputRows.get(1)), - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), + KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(2).getInteger(0))), inputRows.get(2)), - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), + KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(3).getInteger(0))), inputRows.get(3))); } /** * expected results after {@link BeamAggregationTransforms.AggregationCombineFn}. */ - private List<KV<BeamSQLRow, BeamSQLRow>> prepareResultOfAggregationCombineFn() + private List<KV<BeamSqlRow, BeamSqlRow>> prepareResultOfAggregationCombineFn() throws ParseException { return Arrays.asList( - KV.of(new BeamSQLRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), - new BeamSQLRow(aggPartType, Arrays.<Object>asList( + KV.of(new BeamSqlRow(keyType, Arrays.<Object>asList(inputRows.get(0).getInteger(0))), + new BeamSqlRow(aggPartType, Arrays.<Object>asList( 4L, 10000L, 2500L, 4000L, 1000L, (short) 10, (short) 2, (short) 4, (short) 1, @@ -403,7 +403,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ /** * Row type of final output row. */ - private BeamSQLRecordType prepareFinalRowType() { + private BeamSqlRecordType prepareFinalRowType() { FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); List<KV<String, SqlTypeName>> columnMetadata = Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", SqlTypeName.BIGINT), @@ -431,14 +431,14 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return BeamSQLRecordType.from(builder.build()); + return BeamSqlRecordType.from(builder.build()); } /** * expected results after {@link BeamAggregationTransforms.MergeAggregationRecord}. */ - private BeamSQLRow prepareResultOfMergeAggregationRecord() throws ParseException { - return new BeamSQLRow(outputType, Arrays.<Object>asList( + private BeamSqlRow prepareResultOfMergeAggregationRecord() throws ParseException { + return new BeamSqlRow(outputType, Arrays.<Object>asList( 1, 4L, 10000L, 2500L, 4000L, 1000L, (short) 10, (short) 2, (short) 4, (short) 1, http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java index 820d7f5..ef85347 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamTransformBaseTest.java @@ -23,8 +23,8 @@ import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.List; import org.apache.beam.dsls.sql.planner.BeamQueryPlanner; -import org.apache.beam.dsls.sql.schema.BeamSQLRecordType; -import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.beam.dsls.sql.schema.BeamSqlRecordType; +import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.sdk.values.KV; import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder; import org.apache.calcite.sql.type.SqlTypeName; @@ -37,8 +37,8 @@ import org.junit.BeforeClass; public class BeamTransformBaseTest { public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public static BeamSQLRecordType inputRowType; - public static List<BeamSQLRow> inputRows; + public static BeamSqlRecordType inputRowType; + public static List<BeamSqlRow> inputRows; @BeforeClass public static void prepareInput() throws NumberFormatException, ParseException{ @@ -65,20 +65,20 @@ public class BeamTransformBaseTest { } /** - * create a {@code BeamSQLRecordType} for given column metadata. + * create a {@code BeamSqlRecordType} for given column metadata. */ - public static BeamSQLRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ + public static BeamSqlRecordType initTypeOfSqlRow(List<KV<String, SqlTypeName>> columnMetadata){ FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder(); for (KV<String, SqlTypeName> cm : columnMetadata) { builder.add(cm.getKey(), cm.getValue()); } - return BeamSQLRecordType.from(builder.build()); + return BeamSqlRecordType.from(builder.build()); } /** * Create an empty row with given column metadata. */ - public static BeamSQLRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { + public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata) { return initBeamSqlRow(columnMetadata, Arrays.asList()); } @@ -86,11 +86,11 @@ public class BeamTransformBaseTest { * Create a row with given column metadata, and values for each column. * */ - public static BeamSQLRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, + public static BeamSqlRow initBeamSqlRow(List<KV<String, SqlTypeName>> columnMetadata, List<Object> rowValues){ - BeamSQLRecordType rowType = initTypeOfSqlRow(columnMetadata); + BeamSqlRecordType rowType = initTypeOfSqlRow(columnMetadata); - return new BeamSQLRow(rowType, rowValues); + return new BeamSqlRow(rowType, rowValues); } }