Repository: beam
Updated Branches:
  refs/heads/DSL_SQL dedabff1f -> 9395fbb3c


http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
index d4e1db2..4795b2c 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamValuesRelTest.java
@@ -18,7 +18,8 @@
 
 package org.apache.beam.dsls.sql.rel;
 
-import org.apache.beam.dsls.sql.BeamSQLEnvironment;
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.testing.PAssert;
@@ -36,7 +37,6 @@ import org.junit.Test;
 public class BeamValuesRelTest {
   @Rule
   public final TestPipeline pipeline = TestPipeline.create();
-  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
   private static MockedBeamSQLTable stringTable = MockedBeamSQLTable
       .of(SqlTypeName.VARCHAR, "name",
           SqlTypeName.VARCHAR, "description");
@@ -49,7 +49,7 @@ public class BeamValuesRelTest {
   public void testValues() throws Exception {
     String sql = "insert into string_table(name, description) values "
         + "('hello', 'world'), ('james', 'bond')";
-    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
         SqlTypeName.VARCHAR, "name",
         SqlTypeName.VARCHAR, "description",
@@ -61,7 +61,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_castInt() throws Exception {
     String sql = "insert into int_table (c0, c1) values(cast(1 as int), cast(2 
as int))";
-    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
         SqlTypeName.INTEGER, "c0",
         SqlTypeName.INTEGER, "c1",
@@ -73,7 +73,7 @@ public class BeamValuesRelTest {
   @Test
   public void testValues_onlySelect() throws Exception {
     String sql = "select 1, '1'";
-    PCollection<BeamSQLRow> rows = runner.compileBeamPipeline(sql, pipeline);
+    PCollection<BeamSQLRow> rows = BeamSqlCli.compilePipeline(sql, pipeline);
     PAssert.that(rows).containsInAnyOrder(MockedBeamSQLTable.of(
         SqlTypeName.INTEGER, "EXPR$0",
         SqlTypeName.CHAR, "EXPR$1",
@@ -84,8 +84,8 @@ public class BeamValuesRelTest {
 
   @BeforeClass
   public static void prepareClass() {
-    runner.addTableMetadata("string_table", stringTable);
-    runner.addTableMetadata("int_table", intTable);
+    BeamSqlEnv.registerTable("string_table", stringTable);
+    BeamSqlEnv.registerTable("int_table", intTable);
   }
 
   @Before

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
index 6f24e2a..cb268bf 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTableTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.dsls.sql.schema;
 
+import org.apache.beam.dsls.sql.BeamSqlCli;
+import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.planner.BasePlanner;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -27,17 +29,19 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.BeforeClass;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
  * Test case for BeamPCollectionTable.
  */
 public class BeamPCollectionTableTest extends BasePlanner{
-  public static TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
 
-  @BeforeClass
-  public static void prepareTable(){
+  @Before
+  public void prepareTable(){
     RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
@@ -51,14 +55,16 @@ public class BeamPCollectionTableTest extends BasePlanner{
     row.addField(0, 1);
     row.addField(1, "hello world.");
     PCollection<BeamSQLRow> inputStream = 
PBegin.in(pipeline).apply(Create.of(row));
-    runner.addTableMetadata("COLLECTION_TABLE",
+    BeamSqlEnv.registerTable("COLLECTION_TABLE",
         new BeamPCollectionTable(inputStream, protoRowType));
   }
 
   @Test
   public void testSelectFromPCollectionTable() throws Exception{
     String sql = "select c1, c2 from COLLECTION_TABLE";
-    runner.executionPlan(sql);
+    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+
+    pipeline.run().waitUntilFinish();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
index bc6343b..985b667 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoderTest.java
@@ -74,7 +74,7 @@ public class BeamSqlRowCoderTest {
     row.addField("col_timestamp", new Date());
 
 
-    BeamSqlRowCoder coder = BeamSqlRowCoder.of();
+    BeamSqlRowCoder coder = new BeamSqlRowCoder(beamSQLRecordType);
     CoderProperties.coderDecodeEncodeEqual(coder, row);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/680a543d/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
index f174b9c..dadd53b 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java
@@ -23,11 +23,11 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordTypeCoder;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
-import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
 import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.sql.SqlKind;
@@ -62,8 +61,15 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
   public TestPipeline p = TestPipeline.create();
 
   private List<AggregateCall> aggCalls;
-  private BeamSQLRecordType keyType = initTypeOfSqlRow(
-      Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER)));
+
+  private BeamSQLRecordType keyType;
+  private BeamSQLRecordType aggPartType;
+  private BeamSQLRecordType outputType;
+
+  private BeamSqlRowCoder inRecordCoder;
+  private BeamSqlRowCoder keyCoder;
+  private BeamSqlRowCoder aggCoder;
+  private BeamSqlRowCoder outRecordCoder;
 
   /**
    * This step equals to below query.
@@ -97,21 +103,25 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
     //1. extract fields in group-by key part
     PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = 
input.apply("exGroupBy",
         WithKeys
-            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, 
ImmutableBitSet.of(0))));
+            .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(-1, 
ImmutableBitSet.of(0))))
+        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, inRecordCoder));
 
     //2. apply a GroupByKey.
     PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = 
exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create());
+        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create())
+        .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder,
+            IterableCoder.<BeamSQLRow>of(inRecordCoder)));
 
     //3. run aggregation functions
     PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = 
groupedStream.apply("aggregation",
         Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues(
-            new BeamAggregationTransforms.AggregationCombineFn(aggCalls, 
inputRowType)));
+            new BeamAggregationTransforms.AggregationCombineFn(aggCalls, 
inputRowType)))
+        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder));
 
     //4. flat KV to a single record
     PCollection<BeamSQLRow> mergedStream = 
aggregatedStream.apply("mergeRecord",
-        ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            BeamSQLRecordType.from(prepareFinalRowType()), aggCalls)));
+        ParDo.of(new 
BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls)));
+    mergedStream.setCoder(outRecordCoder);
 
     //assert function BeamAggregationTransform.AggregationGroupByKeyFn
     
PAssert.that(exGroupByStream).containsInAnyOrder(prepareResultOfAggregationGroupByKeyFn());
@@ -126,17 +136,8 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
 }
 
   private void setupEnvironment() {
-    regiesterCoder();
     prepareAggregationCalls();
-  }
-
-  /**
-   * Add Coders in BeamSQL.
-   */
-  private void regiesterCoder() {
-    CoderRegistry cr = p.getCoderRegistry();
-    cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
-    cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());
+    prepareTypeAndCoder();
   }
 
   /**
@@ -327,26 +328,15 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
   }
 
   /**
-   * expected results after {@link 
BeamAggregationTransforms.AggregationGroupByKeyFn}.
+   * Coders used in aggregation steps.
    */
-  private List<KV<BeamSQLRow, BeamSQLRow>> 
prepareResultOfAggregationGroupByKeyFn() {
-    return Arrays.asList(
-        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
-            inputRows.get(0)),
-        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
-            inputRows.get(1)),
-        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
-            inputRows.get(2)),
-        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
-            inputRows.get(3)));
-  }
+  private void prepareTypeAndCoder() {
+    inRecordCoder = new BeamSqlRowCoder(inputRowType);
 
-  /**
-   * expected results after {@link 
BeamAggregationTransforms.AggregationCombineFn}.
-   */
-  private List<KV<BeamSQLRow, BeamSQLRow>> 
prepareResultOfAggregationCombineFn()
-      throws ParseException {
-    BeamSQLRecordType aggPartType = initTypeOfSqlRow(
+    keyType = initTypeOfSqlRow(Arrays.asList(KV.of("f_int", 
SqlTypeName.INTEGER)));
+    keyCoder = new BeamSqlRowCoder(keyType);
+
+    aggPartType = initTypeOfSqlRow(
         Arrays.asList(KV.of("count", SqlTypeName.BIGINT),
 
             KV.of("sum1", SqlTypeName.BIGINT), KV.of("avg1", 
SqlTypeName.BIGINT),
@@ -369,6 +359,32 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
             KV.of("sum8", SqlTypeName.INTEGER), KV.of("avg8", 
SqlTypeName.INTEGER),
             KV.of("max8", SqlTypeName.INTEGER), KV.of("min8", 
SqlTypeName.INTEGER)
             ));
+    aggCoder = new BeamSqlRowCoder(aggPartType);
+
+    outputType = prepareFinalRowType();
+    outRecordCoder = new BeamSqlRowCoder(outputType);
+  }
+
+  /**
+   * expected results after {@link 
BeamAggregationTransforms.AggregationGroupByKeyFn}.
+   */
+  private List<KV<BeamSQLRow, BeamSQLRow>> 
prepareResultOfAggregationGroupByKeyFn() {
+    return Arrays.asList(
+        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
+            inputRows.get(0)),
+        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(1).getInteger(0))),
+            inputRows.get(1)),
+        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(2).getInteger(0))),
+            inputRows.get(2)),
+        KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(3).getInteger(0))),
+            inputRows.get(3)));
+  }
+
+  /**
+   * expected results after {@link 
BeamAggregationTransforms.AggregationCombineFn}.
+   */
+  private List<KV<BeamSQLRow, BeamSQLRow>> 
prepareResultOfAggregationCombineFn()
+      throws ParseException {
     return Arrays.asList(
             KV.of(new BeamSQLRow(keyType, 
Arrays.<Object>asList(inputRows.get(0).getInteger(0))),
                 new BeamSQLRow(aggPartType, Arrays.<Object>asList(
@@ -387,7 +403,7 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
   /**
    * Row type of final output row.
    */
-  private RelDataType prepareFinalRowType() {
+  private BeamSQLRecordType prepareFinalRowType() {
     FieldInfoBuilder builder = BeamQueryPlanner.TYPE_FACTORY.builder();
     List<KV<String, SqlTypeName>> columnMetadata =
         Arrays.asList(KV.of("f_int", SqlTypeName.INTEGER), KV.of("count", 
SqlTypeName.BIGINT),
@@ -415,14 +431,14 @@ public class BeamAggregationTransformTest extends 
BeamTransformBaseTest{
     for (KV<String, SqlTypeName> cm : columnMetadata) {
       builder.add(cm.getKey(), cm.getValue());
     }
-    return builder.build();
+    return BeamSQLRecordType.from(builder.build());
   }
 
   /**
    * expected results after {@link 
BeamAggregationTransforms.MergeAggregationRecord}.
    */
   private BeamSQLRow prepareResultOfMergeAggregationRecord() throws 
ParseException {
-    return new BeamSQLRow(BeamSQLRecordType.from(prepareFinalRowType()), 
Arrays.<Object>asList(
+    return new BeamSQLRow(outputType, Arrays.<Object>asList(
         1, 4L,
         10000L, 2500L, 4000L, 1000L,
         (short) 10, (short) 2, (short) 4, (short) 1,

Reply via email to