Repository: beam
Updated Branches:
  refs/heads/DSL_SQL f96f9f680 -> 25babc999


[BEAM-2288] Refine DSL interface as design doc of BEAM-2010:
1. rename BeamSqlRunner to BeamSQLEnvironment;
2. Move pipeline as external parameter, to integrate with none-DSL components;


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7b3f7a2b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7b3f7a2b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7b3f7a2b

Branch: refs/heads/DSL_SQL
Commit: 7b3f7a2b5bb5017819e8c8c53620d3cf97a38008
Parents: f96f9f6
Author: mingmxu <ming...@ebay.com>
Authored: Sun May 14 11:02:37 2017 -0700
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Wed May 17 21:03:13 2017 +0200

----------------------------------------------------------------------
 .../beam/dsls/sql/BeamSQLEnvironment.java       | 142 +++++++++++++++++++
 .../beam/dsls/sql/example/BeamSqlExample.java   |  23 ++-
 .../dsls/sql/planner/BeamPipelineCreator.java   |   9 +-
 .../beam/dsls/sql/planner/BeamQueryPlanner.java |  34 ++---
 .../beam/dsls/sql/planner/BeamSqlRunner.java    | 103 --------------
 .../beam/dsls/sql/planner/BasePlanner.java      |   9 +-
 .../sql/planner/BeamGroupByExplainTest.java     |  14 +-
 .../sql/planner/BeamGroupByPipelineTest.java    |  19 +--
 .../sql/planner/BeamInvalidGroupByTest.java     |  10 +-
 .../BeamPlannerAggregationSubmitTest.java       |  17 ++-
 .../sql/planner/BeamPlannerExplainTest.java     |   7 +-
 .../dsls/sql/planner/BeamPlannerSubmitTest.java |  11 +-
 .../beam/dsls/sql/rel/BeamSortRelTest.java      |  40 ++++--
 13 files changed, 251 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
new file mode 100644
index 0000000..cdb25f5
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSQLEnvironment.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql;
+
+import java.io.Serializable;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.schema.Schema;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.impl.ScalarFunctionImpl;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@code BeamSQLEnvironment} is the integrated environment of BeamSQL.
+ * It provides runtime context to execute SQL queries as Beam pipeline,
+ * including table metadata, SQL engine and a Beam pipeline translator.
+ *
+ * <h1>1. BeamSQL as DSL</h1>
+ * <em>BeamSQL as DSL</em> enables developers to embed SQL queries when 
writing a Beam pipeline.
+ * A typical pipeline with BeamSQL DSL is:
+ * <pre>
+ *{@code
+PipelineOptions options =  PipelineOptionsFactory...
+Pipeline pipeline = Pipeline.create(options);
+
+//prepare environment of BeamSQL
+BeamSQLEnvironment sqlEnv = BeamSQLEnvironment.create();
+//register table metadata
+sqlEnv.addTableMetadata(String tableName, BeamSqlTable tableMetadata);
+//register UDF
+sqlEnv.registerUDF(String functionName, Method udfMethod);
+
+
+//explain a SQL statement, SELECT only, and return as a PCollection;
+PCollection<BeamSQLRow> phase1Stream = sqlEnv.explainSQL(pipeline, String 
sqlStatement);
+//A PCollection explained by BeamSQL can be converted into a table, and apply 
queries on it;
+sqlEnv.registerPCollectionAsTable(String tableName, phase1Stream);
+
+//apply more queries, even based on phase1Stream
+
+pipeline.run().waitUntilFinish();
+ * }
+ * </pre>
+ *
+ * <h1>2. BeamSQL as CLI</h1>
+ * This feature is on planning, and not ready yet.
+ *
+ */
+public class BeamSQLEnvironment implements Serializable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamSQLEnvironment.class);
+
+  public static final BeamSQLEnvironment INSTANCE = new BeamSQLEnvironment();
+
+  private SchemaPlus schema = Frameworks.createRootSchema(true);
+  private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
+
+  private BeamSQLEnvironment() {
+    //disable assertions in Calcite.
+    ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
+  }
+
+  /**
+   * Return an instance of {@code BeamSQLEnvironment}.
+   */
+  public static BeamSQLEnvironment create(){
+    return INSTANCE;
+  }
+
+  /**
+   * Add a schema.
+   *
+   */
+  public void addSchema(String schemaName, Schema scheme) {
+    schema.add(schemaName, schema);
+  }
+
+  /**
+   * add a {@link BaseBeamTable} to schema repository.
+   */
+  public void addTableMetadata(String tableName, BaseBeamTable tableMetadata) {
+    schema.add(tableName, tableMetadata);
+    planner.getSourceTables().put(tableName, tableMetadata);
+  }
+
+  /* Add a UDF function.
+   *
+   * <p>There're two requirements for function {@code methodName}:<br>
+   * 1. It must be a STATIC method;<br>
+   * 2. For a primitive parameter, use its wrapper class and handle NULL 
properly;
+   */
+  public void addUDFFunction(String functionName, Class<?> className, String 
methodName){
+    schema.add(functionName, ScalarFunctionImpl.create(className, methodName));
+  }
+
+  /**
+   * explain and display the execution plan.
+   */
+  public String executionPlan(String sqlString)
+    throws ValidationException, RelConversionException, SqlParseException {
+    BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
+    String beamPlan = RelOptUtil.toString(exeTree);
+    LOG.info(String.format("beamPlan>\n%s", beamPlan));
+    return beamPlan;
+  }
+
+  /**
+   * {@code compileBeamPipeline} translate a SQL statement to executed as Beam 
data flow,
+   * which is linked with the given {@code pipeline}. The final output stream 
is returned as
+   * {@code PCollection} so more operations can be applied.
+   */
+  public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, 
Pipeline basePipeline)
+      throws Exception{
+    PCollection<BeamSQLRow> resultStream = 
planner.compileBeamPipeline(sqlStatement, basePipeline);
+    return resultStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
index 303835f..2695944 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/example/BeamSqlExample.java
@@ -21,9 +21,14 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.beam.dsls.sql.planner.BeamSqlRunner;
+import org.apache.beam.dsls.sql.BeamSQLEnvironment;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
@@ -67,17 +72,23 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 public class BeamSqlExample implements Serializable {
 
   public static void main(String[] args) throws Exception {
-    BeamSqlRunner runner = new BeamSqlRunner();
-    runner.addTable("ORDER_DETAILS", getTable("127.0.0.1:9092", "orders"));
-    runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
+    PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] 
{}).withValidation()
+        .as(PipelineOptions.class); // FlinkPipelineOptions.class
+    options.setJobName("BeamSqlExample");
+    Pipeline pipeline = Pipeline.create(options);
+
+    BeamSQLEnvironment runner = BeamSQLEnvironment.create();
+    runner.addTableMetadata("ORDER_DETAILS", getTable("127.0.0.1:9092", 
"orders"));
+    runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", 
"sub_orders"));
 
     // case 2: insert into <table>(<fields>) select STREAM <fields> from
     // <table> from <clause>
     String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
         + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE 
SITE_ID = 0 and price > 20";
 
-    runner.explainQuery(sql);
-    runner.submitQuery(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
+
+    pipeline.run().waitUntilFinish();
   }
 
   public static BaseBeamTable getTable(String bootstrapServer, String topic) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
index 98ccb57..1f3ba58 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamPipelineCreator.java
@@ -28,7 +28,6 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /**
  * {@link BeamPipelineCreator} converts a {@link BeamRelNode} tree, into a Beam
@@ -44,14 +43,10 @@ public class BeamPipelineCreator {
 
   private boolean hasPersistent = false;
 
-  public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables) {
+  public BeamPipelineCreator(Map<String, BaseBeamTable> sourceTables, Pipeline 
pipeline) {
     this.sourceTables = sourceTables;
+    this.pipeline = pipeline;
 
-    options = PipelineOptionsFactory.fromArgs(new String[] {}).withValidation()
-        .as(PipelineOptions.class); // FlinkPipelineOptions.class
-    options.setJobName("BeamPlanCreator");
-
-    pipeline = Pipeline.create(options);
     CoderRegistry cr = pipeline.getCoderRegistry();
     cr.registerCoder(BeamSQLRow.class, BeamSqlRowCoder.of());
     cr.registerCoder(BeamSQLRecordType.class, BeamSQLRecordTypeCoder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
index 9e41555..0a7407c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java
@@ -26,9 +26,9 @@ import java.util.Map;
 import org.apache.beam.dsls.sql.rel.BeamLogicalConvention;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteSchema;
@@ -58,8 +58,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The core component to handle through a SQL statement, to submit a Beam
- * pipeline.
+ * The core component to handle through a SQL statement, from explain 
execution plan,
+ * to generate a Beam pipeline.
  *
  */
 public class BeamQueryPlanner {
@@ -95,29 +95,17 @@ public class BeamQueryPlanner {
   }
 
   /**
-   * With a Beam pipeline generated in {@link #compileBeamPipeline(String)},
-   * submit it to run and wait until finish.
-   *
-   */
-  public void submitToRun(String sqlStatement) throws Exception {
-    Pipeline pipeline = compileBeamPipeline(sqlStatement);
-
-    PipelineResult result = pipeline.run();
-    result.waitUntilFinish();
-  }
-
-  /**
-   * With the @{@link BeamRelNode} tree generated in
-   * {@link #convertToBeamRel(String)}, a Beam pipeline is generated.
-   *
+   * {@code compileBeamPipeline} translate a SQL statement to executed as Beam 
data flow,
+   * which is linked with the given {@code pipeline}. The final output stream 
is returned as
+   * {@code PCollection} so more operations can be applied.
    */
-  public Pipeline compileBeamPipeline(String sqlStatement) throws Exception {
+  public PCollection<BeamSQLRow> compileBeamPipeline(String sqlStatement, 
Pipeline pipeline)
+      throws Exception {
     BeamRelNode relNode = convertToBeamRel(sqlStatement);
 
-    BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables);
-    relNode.buildBeamPipeline(planCreator);
+    BeamPipelineCreator planCreator = new BeamPipelineCreator(sourceTables, 
pipeline);
 
-    return planCreator.getPipeline();
+    return relNode.buildBeamPipeline(planCreator);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
deleted file mode 100644
index 95ba5a9..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.planner;
-
-import java.io.Serializable;
-
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.impl.ScalarFunctionImpl;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Interface to explain, submit a SQL query.
- *
- */
-public class BeamSqlRunner implements Serializable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BeamSqlRunner.class);
-
-  private SchemaPlus schema = Frameworks.createRootSchema(true);
-
-  private BeamQueryPlanner planner = new BeamQueryPlanner(schema);
-
-  public BeamSqlRunner() {
-    //disable assertions in Calcite.
-    ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
-  }
-
-  /**
-   * Add a schema.
-   *
-   */
-  public void addSchema(String schemaName, Schema scheme) {
-    schema.add(schemaName, schema);
-  }
-
-  /**
-   * add a {@link BaseBeamTable} to schema repository.
-   *
-   */
-  public void addTable(String tableName, BaseBeamTable table) {
-    schema.add(tableName, table);
-    planner.getSourceTables().put(tableName, table);
-  }
-
-  /**
-   * Add a UDF function.
-   *
-   * <p>There're two requirements for function {@code methodName}:<br>
-   * 1. It must be a STATIC method;<br>
-   * 2. For a primitive parameter, use its wrapper class and handle NULL 
properly;
-   */
-  public void addUDFFunction(String functionName, Class<?> className, String 
methodName){
-    schema.add(functionName, ScalarFunctionImpl.create(className, methodName));
-  }
-
-  /**
-   * submit as a Beam pipeline.
-   *
-   */
-  public void submitQuery(String sqlString) throws Exception {
-    planner.submitToRun(sqlString);
-  }
-
-  /**
-   * explain and display the execution plan.
-   *
-   */
-  public String explainQuery(String sqlString)
-    throws ValidationException, RelConversionException, SqlParseException {
-    BeamRelNode exeTree = planner.convertToBeamRel(sqlString);
-    String beamPlan = RelOptUtil.toString(exeTree);
-    System.out.println(String.format("beamPlan>\n%s", beamPlan));
-    return beamPlan;
-  }
-
-  protected BeamQueryPlanner getPlanner() {
-    return planner;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 055a4d4..fe8a236 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -21,6 +21,7 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.dsls.sql.BeamSQLEnvironment;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
@@ -37,13 +38,13 @@ import org.junit.BeforeClass;
  *
  */
 public class BasePlanner {
-  public static BeamSqlRunner runner = new BeamSqlRunner();
+  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
 
   @BeforeClass
   public static void prepare() {
-    runner.addTable("ORDER_DETAILS", getTable());
-    runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
-    runner.addTable("SUB_ORDER_RAM", getTable());
+    runner.addTableMetadata("ORDER_DETAILS", getTable());
+    runner.addTableMetadata("SUB_ORDER", getTable("127.0.0.1:9092", 
"sub_orders"));
+    runner.addTableMetadata("SUB_ORDER_RAM", getTable());
   }
 
   private static BaseBeamTable getTable() {

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
index 4f2b1ba..98d14c3 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java
@@ -33,7 +33,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
   public void testSimpleGroupExplain() throws Exception {
     String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 ";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 
   /**
@@ -43,7 +43,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
   public void testSimpleGroup2Explain() throws Exception {
     String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 
   /**
@@ -54,7 +54,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 
   /**
@@ -66,7 +66,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
         + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' 
HOUR, TIME '00:00:01')";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 
   /**
@@ -77,7 +77,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 
   /**
@@ -88,7 +88,7 @@ public class BeamGroupByExplainTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 
   /**
@@ -99,6 +99,6 @@ public class BeamGroupByExplainTest extends BasePlanner {
     runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, 
"negative");
     String sql = "select site_id, negative(site_id) as nsite_id from 
ORDER_DETAILS";
 
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
index 71dcf73..5101c98 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -18,7 +18,9 @@
 package org.apache.beam.dsls.sql.planner;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;
 
 /**
@@ -26,6 +28,7 @@ import org.junit.Test;
  *
  */
 public class BeamGroupByPipelineTest extends BasePlanner {
+  public final TestPipeline pipeline = TestPipeline.create();
 
   /**
    * GROUP-BY without window operation, and grouped fields.
@@ -34,7 +37,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
   public void testSimpleGroupExplain() throws Exception {
     String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 ";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   /**
@@ -44,7 +47,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
   public void testSimpleGroup2Explain() throws Exception {
     String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   /**
@@ -55,7 +58,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   /**
@@ -67,7 +70,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
         + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' 
HOUR, TIME '00:00:01')";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   /**
@@ -78,7 +81,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   /**
@@ -89,7 +92,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   /**
@@ -100,7 +103,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, 
"negative");
     String sql = "select site_id, negative(site_id) as nsite_id from 
ORDER_DETAILS";
 
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
index 83ab871..72b5bf7 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
@@ -17,7 +17,11 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.tools.ValidationException;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -25,12 +29,14 @@ import org.junit.Test;
  *
  */
 public class BeamInvalidGroupByTest extends BasePlanner {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
 
   @Test(expected = ValidationException.class)
   public void testTumble2Explain() throws Exception {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, 
INTERVAL '1' HOUR)";
-    String plan = runner.explainQuery(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
   @Test(expected = ValidationException.class)
@@ -38,7 +44,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
     String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' 
HOUR)"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' 
HOUR)";
-    String plan = runner.explainQuery(sql);
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
index 39a5614..22f1848 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -21,16 +21,18 @@ import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
+import org.apache.beam.dsls.sql.BeamSQLEnvironment;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -39,12 +41,15 @@ import org.junit.Test;
  */
 public class BeamPlannerAggregationSubmitTest {
   public static DateFormat format = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
-  public static BeamSqlRunner runner = new BeamSqlRunner();
+  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
+
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
 
   @BeforeClass
   public static void prepare() throws ParseException {
-    runner.addTable("ORDER_DETAILS", getOrderTable());
-    runner.addTable("ORDER_SUMMARY", getSummaryTable());
+    runner.addTableMetadata("ORDER_DETAILS", getOrderTable());
+    runner.addTableMetadata("ORDER_SUMMARY", getSummaryTable());
   }
 
   private static BaseBeamTable getOrderTable() throws ParseException {
@@ -108,7 +113,7 @@ public class BeamPlannerAggregationSubmitTest {
         + "WHERE SITE_ID = 1 " + "GROUP BY site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR, TIME '00:00:01')";
 
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    runner.compileBeamPipeline(sql, pipeline);
 
     pipeline.run().waitUntilFinish();
 
@@ -125,7 +130,7 @@ public class BeamPlannerAggregationSubmitTest {
         + "SELECT site_id, COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
 
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+    runner.compileBeamPipeline(sql, pipeline);
 
     pipeline.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
index 9b6b20a..1355d5d 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -25,11 +25,10 @@ import org.junit.Test;
  *
  */
 public class BeamPlannerExplainTest extends BasePlanner {
-
   @Test
   public void selectAll() throws Exception {
     String sql = "SELECT * FROM ORDER_DETAILS";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
 
     String expectedPlan =
         "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[$3])\n"
@@ -41,7 +40,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
   public void selectWithFilter() throws Exception {
     String sql = "SELECT " + " order_id, site_id, price " + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 and price > 20";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
 
     String expectedPlan = "BeamProjectRel(order_id=[$0], site_id=[$1], 
price=[$2])\n"
         + "  BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
@@ -54,7 +53,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
     String sql = "INSERT INTO SUB_ORDER(order_id, site_id, price) " + "SELECT "
         + " order_id, site_id, price " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 and price > 20";
-    String plan = runner.explainQuery(sql);
+    String plan = runner.executionPlan(sql);
 
     String expectedPlan =
         "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], 
flattened=[true])\n"

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index 5435049..17cea27 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -17,8 +17,11 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
@@ -26,12 +29,16 @@ import org.junit.Test;
  *
  */
 public class BeamPlannerSubmitTest extends BasePlanner {
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
   @Test
   public void insertSelectFilter() throws Exception {
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
         + " order_id, site_id, price "
         + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 and price > 20";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
+
+    PCollection<BeamSQLRow> outputStream = runner.compileBeamPipeline(sql, 
pipeline);
 
     pipeline.run().waitUntilFinish();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/7b3f7a2b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
index 11cec51..864d4b7 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/rel/BeamSortRelTest.java
@@ -20,20 +20,24 @@ package org.apache.beam.dsls.sql.rel;
 
 import java.util.Date;
 import java.util.List;
-
+import org.apache.beam.dsls.sql.BeamSQLEnvironment;
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.planner.BeamSqlRunner;
 import org.apache.beam.dsls.sql.planner.MockedBeamSQLTable;
 import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 
 /**
  * Test for {@code BeamSortRel}.
  */
 public class BeamSortRelTest {
-  public static BeamSqlRunner runner = new BeamSqlRunner();
+  public static BeamSQLEnvironment runner = BeamSQLEnvironment.create();
+  @Rule
+  public final TestPipeline pipeline = TestPipeline.create();
+
   private static MockedBeamSQLTable subOrderRamTable = MockedBeamSQLTable.of(
       SqlTypeName.BIGINT, "order_id",
       SqlTypeName.INTEGER, "site_id",
@@ -65,7 +69,8 @@ public class BeamSortRelTest {
         + "ORDER BY order_id asc, site_id desc limit 4";
 
     System.out.println(sql);
-    runner.submitQuery(sql);
+    runner.compileBeamPipeline(sql, pipeline);
+    pipeline.run().waitUntilFinish();
 
     assertEquals(
         MockedBeamSQLTable.of(
@@ -81,7 +86,7 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_nullsFirst() throws Exception {
-    runner.addTable("ORDER_DETAILS", MockedBeamSQLTable
+    runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price",
@@ -91,7 +96,7 @@ public class BeamSortRelTest {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable
+    runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price"));
@@ -101,7 +106,8 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS FIRST limit 4";
 
-    runner.submitQuery(sql);
+    runner.compileBeamPipeline(sql, pipeline);
+    pipeline.run().waitUntilFinish();
 
     assertEquals(
         MockedBeamSQLTable.of(
@@ -118,7 +124,7 @@ public class BeamSortRelTest {
 
   @Test
   public void testOrderBy_nullsLast() throws Exception {
-    runner.addTable("ORDER_DETAILS", MockedBeamSQLTable
+    runner.addTableMetadata("ORDER_DETAILS", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price",
@@ -128,7 +134,7 @@ public class BeamSortRelTest {
             2L, 1, 3.0,
             2L, null, 4.0,
             5L, 5, 5.0));
-    runner.addTable("SUB_ORDER_RAM", MockedBeamSQLTable
+    runner.addTableMetadata("SUB_ORDER_RAM", MockedBeamSQLTable
         .of(SqlTypeName.BIGINT, "order_id",
             SqlTypeName.INTEGER, "site_id",
             SqlTypeName.DOUBLE, "price"));
@@ -138,7 +144,8 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc NULLS LAST limit 4";
 
-    runner.submitQuery(sql);
+    runner.compileBeamPipeline(sql, pipeline);
+    pipeline.run().waitUntilFinish();
 
     assertEquals(
         MockedBeamSQLTable.of(
@@ -161,7 +168,8 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 4 offset 4";
 
-    runner.submitQuery(sql);
+    runner.compileBeamPipeline(sql, pipeline);
+    pipeline.run().waitUntilFinish();
 
     assertEquals(
         MockedBeamSQLTable.of(
@@ -184,7 +192,8 @@ public class BeamSortRelTest {
         + "FROM ORDER_DETAILS "
         + "ORDER BY order_id asc, site_id desc limit 11";
 
-    runner.submitQuery(sql);
+    runner.compileBeamPipeline(sql, pipeline);
+    pipeline.run().waitUntilFinish();
 
     assertEquals(
         MockedBeamSQLTable.of(
@@ -214,12 +223,13 @@ public class BeamSortRelTest {
         + "GROUP BY order_id, TUMBLE(order_time, INTERVAL '1' HOUR)"
         + "ORDER BY order_id asc limit 11";
 
-    runner.submitQuery(sql);
+    TestPipeline pipeline = TestPipeline.create();
+    runner.compileBeamPipeline(sql, pipeline);
   }
 
   public static void prepare() {
-    runner.addTable("ORDER_DETAILS", orderDetailTable);
-    runner.addTable("SUB_ORDER_RAM", subOrderRamTable);
+    runner.addTableMetadata("ORDER_DETAILS", orderDetailTable);
+    runner.addTableMetadata("SUB_ORDER_RAM", subOrderRamTable);
   }
 
   private void assertEquals(List<BeamSQLRow> rows1, List<BeamSQLRow> rows2) {

Reply via email to