Repository: beam
Updated Branches:
  refs/heads/DSL_SQL aa07a1d41 -> a6845a35f


http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 1979a00..22ffaad 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -20,7 +20,6 @@ package org.apache.beam.dsls.sql.schema;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Date;
 import java.util.List;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -55,10 +54,8 @@ public class BeamSqlRowCoder extends 
StandardCoder<BeamSQLRow>{
   @Override
   public void encode(BeamSQLRow value, OutputStream outStream,
       org.apache.beam.sdk.coders.Coder.Context context) throws CoderException, 
IOException {
-    recordTypeCoder.encode(value.getDataType(), outStream, context);
-    listCoder.encode(value.getNullFields(), outStream, context);
-
-    Context nested = context.nested();
+    recordTypeCoder.encode(value.getDataType(), outStream, context.nested());
+    listCoder.encode(value.getNullFields(), outStream, context.nested());
 
     for (int idx = 0; idx < value.size(); ++idx) {
       if (value.getNullFields().contains(idx)) {
@@ -67,36 +64,38 @@ public class BeamSqlRowCoder extends 
StandardCoder<BeamSQLRow>{
 
       switch (value.getDataType().getFieldsType().get(idx)) {
       case INTEGER:
+        intCoder.encode(value.getInteger(idx), outStream, context.nested());
+        break;
       case SMALLINT:
       case TINYINT:
-        intCoder.encode(value.getInteger(idx), outStream, nested);
+        intCoder.encode((int) value.getShort(idx), outStream, 
context.nested());
         break;
       case DOUBLE:
+        doubleCoder.encode(value.getDouble(idx), outStream, context.nested());
+        break;
       case FLOAT:
-        doubleCoder.encode(value.getDouble(idx), outStream, nested);
+        doubleCoder.encode((double) value.getFloat(idx), outStream, 
context.nested());
         break;
       case BIGINT:
-        longCoder.encode(value.getLong(idx), outStream, nested);
+        longCoder.encode(value.getLong(idx), outStream, context.nested());
         break;
       case VARCHAR:
-        stringCoder.encode(value.getString(idx), outStream, nested);
-        break;
-      case TIME:
-      case TIMESTAMP:
-        longCoder.encode(value.getDate(idx).getTime(), outStream, nested);
+        stringCoder.encode(value.getString(idx), outStream, context.nested());
         break;
 
       default:
         throw new 
UnsupportedDataTypeException(value.getDataType().getFieldsType().get(idx));
       }
     }
+    //add a dummy field to indicate the end of record
+    intCoder.encode(value.size(), outStream, context);
   }
 
   @Override
   public BeamSQLRow decode(InputStream inStream, 
org.apache.beam.sdk.coders.Coder.Context context)
       throws CoderException, IOException {
-    BeamSQLRecordType type = recordTypeCoder.decode(inStream, context);
-    List<Integer> nullFields = listCoder.decode(inStream, context);
+    BeamSQLRecordType type = recordTypeCoder.decode(inStream, 
context.nested());
+    List<Integer> nullFields = listCoder.decode(inStream, context.nested());
 
     BeamSQLRow record = new BeamSQLRow(type);
     record.setNullFields(nullFields);
@@ -108,29 +107,32 @@ public class BeamSqlRowCoder extends 
StandardCoder<BeamSQLRow>{
 
       switch (type.getFieldsType().get(idx)) {
       case INTEGER:
+        record.addField(idx, intCoder.decode(inStream, context.nested()));
+        break;
       case SMALLINT:
+        record.addField(idx, intCoder.decode(inStream, 
context.nested()).shortValue());
+        break;
       case TINYINT:
-        record.addField(idx, intCoder.decode(inStream, context));
+        record.addField(idx, intCoder.decode(inStream, 
context.nested()).byteValue());
         break;
       case DOUBLE:
+        record.addField(idx, doubleCoder.decode(inStream, context.nested()));
+        break;
       case FLOAT:
-        record.addField(idx, doubleCoder.decode(inStream, context));
+        record.addField(idx, doubleCoder.decode(inStream, 
context.nested()).floatValue());
         break;
       case BIGINT:
-        record.addField(idx, longCoder.decode(inStream, context));
+        record.addField(idx, longCoder.decode(inStream, context.nested()));
         break;
       case VARCHAR:
-        record.addField(idx, stringCoder.decode(inStream, context));
-        break;
-      case TIME:
-      case TIMESTAMP:
-        record.addField(idx, new Date(longCoder.decode(inStream, context)));
+        record.addField(idx, stringCoder.decode(inStream, context.nested()));
         break;
 
       default:
         throw new UnsupportedDataTypeException(type.getFieldsType().get(idx));
       }
     }
+    intCoder.decode(inStream, context);
 
     return record;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java
deleted file mode 100644
index c929a83..0000000
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/InvalidFieldException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-/**
- * Exception when the field is invalid.
- *
- */
-public class InvalidFieldException extends RuntimeException {
-
-  public InvalidFieldException() {
-    super();
-  }
-
-  public InvalidFieldException(String message) {
-    super(message);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index 915a5cc..0f40f33 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -35,11 +35,6 @@ import org.slf4j.LoggerFactory;
  */
 public class BeamKafkaCSVTable extends BeamKafkaTable {
 
-  /**
-   *
-   */
-  private static final long serialVersionUID = 4754022536543333984L;
-
   public static final String DELIMITER = ",";
   private static final Logger LOG = 
LoggerFactory.getLogger(BeamKafkaCSVTable.class);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index adf4621..c8c851c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -40,11 +40,6 @@ import org.apache.calcite.rel.type.RelProtoDataType;
  */
 public abstract class BeamKafkaTable extends BaseBeamTable implements 
Serializable {
 
-  /**
-   *
-   */
-  private static final long serialVersionUID = -634715473399906527L;
-
   private String bootstrapServers;
   private List<String> topics;
   private Map<String, Object> configUpdates;

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
index 55086e2..2ab6301 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
@@ -28,10 +28,6 @@ import org.apache.beam.sdk.transforms.DoFn;
  *
  */
 public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1256111753670606705L;
 
   private String stepName;
   private BeamSQLExpressionExecutor executor;

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
index 92ebff2..c146ea5 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
@@ -25,10 +25,6 @@ import org.apache.beam.sdk.transforms.DoFn;
  *
  */
 public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> {
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1256111753670606705L;
 
   private String stepName;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
index bafdd17..d018057 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
@@ -30,11 +30,6 @@ import org.apache.beam.sdk.transforms.DoFn;
  *
  */
 public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-
-  /**
-   *
-   */
-  private static final long serialVersionUID = -1046605249999014608L;
   private String stepName;
   private BeamSQLExpressionExecutor executor;
   private BeamSQLRecordType outputRecordType;

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
new file mode 100644
index 0000000..abbe3f7
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test cases for {@link BeamSQLFnExecutor}.
+ */
+public class BeamSQLFnExecutorTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testBeamFilterRel() {
+    RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
+                    rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
+            rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
+                    rexBuilder.makeExactLiteral(new BigDecimal(0))))));
+
+    BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, 
RelTraitSet.createEmpty(), null,
+        condition);
+
+    BeamSQLFnExecutor executor = new BeamSQLFnExecutor(beamFilterRel);
+    executor.prepare();
+
+    Assert.assertEquals(1, executor.exps.size());
+
+    BeamSqlExpression l1Exp = executor.exps.get(0);
+    Assert.assertTrue(l1Exp instanceof BeamSqlAndExpression);
+    Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
+
+    Assert.assertEquals(2, l1Exp.getOperands().size());
+    BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
+    BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
+
+    Assert.assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression);
+    Assert.assertTrue(l1Right instanceof BeamSqlEqualExpression);
+
+    Assert.assertEquals(2, l1Left.getOperands().size());
+    BeamSqlExpression l1LeftLeft = (BeamSqlExpression) 
l1Left.getOperands().get(0);
+    BeamSqlExpression l1LeftRight = (BeamSqlExpression) 
l1Left.getOperands().get(1);
+    Assert.assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+    Assert.assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+
+    Assert.assertEquals(2, l1Right.getOperands().size());
+    BeamSqlExpression l1RightLeft = (BeamSqlExpression) 
l1Right.getOperands().get(0);
+    BeamSqlExpression l1RightRight = (BeamSqlExpression) 
l1Right.getOperands().get(1);
+    Assert.assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+    Assert.assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+  }
+
+  @Test
+  public void testBeamProjectRel() {
+    BeamRelNode relNode = new BeamProjectRel(cluster, 
RelTraitSet.createEmpty(),
+        relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
+        rexBuilder.identityProjects(relDataType), relDataType);
+    BeamSQLFnExecutor executor = new BeamSQLFnExecutor(relNode);
+
+    executor.prepare();
+    Assert.assertEquals(4, executor.exps.size());
+    Assert.assertTrue(executor.exps.get(0) instanceof 
BeamSqlInputRefExpression);
+    Assert.assertTrue(executor.exps.get(1) instanceof 
BeamSqlInputRefExpression);
+    Assert.assertTrue(executor.exps.get(2) instanceof 
BeamSqlInputRefExpression);
+    Assert.assertTrue(executor.exps.get(3) instanceof 
BeamSqlInputRefExpression);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
new file mode 100644
index 0000000..bfc7366
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
+import org.apache.beam.dsls.sql.planner.BeamRuleSets;
+import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.junit.BeforeClass;
+
+/**
+ * base class to test {@link BeamSQLFnExecutor} and subclasses of {@link 
BeamSqlExpression}.
+ */
+public class BeamSQLFnExecutorTestBase {
+  public static RexBuilder rexBuilder = new 
RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+  public static RelOptCluster cluster = RelOptCluster.create(new 
VolcanoPlanner(), rexBuilder);
+
+  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  public static RelDataType relDataType;
+
+  public static BeamSQLRecordType beamRecordType;
+  public static BeamSQLRow record;
+
+  public static RelBuilder relBuilder;
+
+  @BeforeClass
+  public static void prepare() {
+    relDataType = TYPE_FACTORY.builder()
+        .add("order_id", SqlTypeName.BIGINT)
+        .add("site_id", SqlTypeName.INTEGER)
+        .add("price", SqlTypeName.DOUBLE)
+        .add("order_time", SqlTypeName.BIGINT).build();
+
+    beamRecordType = BeamSQLRecordType.from(relDataType);
+    record = new BeamSQLRow(beamRecordType);
+
+    record.addField(0, 1234567L);
+    record.addField(1, 0);
+    record.addField(2, 8.9);
+    record.addField(3, 1234567L);
+
+    SchemaPlus schema = Frameworks.createRootSchema(true);
+    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+    FrameworkConfig config = Frameworks.newConfigBuilder()
+        
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+        
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+        
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+
+    relBuilder = RelBuilder.create(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
new file mode 100644
index 0000000..a328c88
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlIsNullExpression} and
+ * {@link BeamSqlIsNotNullExpression}.
+ */
+public class BeamNullExperssionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testIsNull() {
+    BeamSqlIsNullExpression exp1 = new BeamSqlIsNullExpression(
+        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlIsNullExpression exp2 = new BeamSqlIsNullExpression(
+        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testIsNotNull() {
+    BeamSqlIsNotNullExpression exp1 = new BeamSqlIsNotNullExpression(
+        new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlIsNotNullExpression exp2 = new BeamSqlIsNotNullExpression(
+        BeamSqlPrimitive.of(SqlTypeName.BIGINT, null));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
new file mode 100644
index 0000000..9dabcdc
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
+ */
+public class BeamSqlAndOrExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testAnd() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+    Assert.assertTrue(new 
BeamSqlAndExpression(operands).evaluate(record).getValue());
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+    Assert.assertFalse(new 
BeamSqlAndExpression(operands).evaluate(record).getValue());
+  }
+
+  @Test
+  public void testOr() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, false));
+
+    Assert.assertFalse(new 
BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BOOLEAN, true));
+
+    Assert.assertTrue(new 
BeamSqlOrExpression(operands).evaluate(record).getValue());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
new file mode 100644
index 0000000..b88de71
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Arrays;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for the collections of {@link BeamSqlCompareExpression}.
+ */
+public class BeamSqlCompareExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testEqual() {
+    BeamSqlEqualExpression exp1 = new BeamSqlEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 100L)));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlEqualExpression exp2 = new BeamSqlEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLargerThan(){
+    BeamSqlLargerThanExpression exp1 = new BeamSqlLargerThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlLargerThanExpression exp2 = new BeamSqlLargerThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234566L)));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLargerThanEqual(){
+    BeamSqlLargerThanEqualExpression exp1 = new 
BeamSqlLargerThanEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlLargerThanEqualExpression exp2 = new 
BeamSqlLargerThanEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234568L)));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLessThan(){
+    BeamSqlLessThanExpression exp1 = new BeamSqlLessThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, 1)));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlLessThanExpression exp2 = new BeamSqlLessThanExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1),
+            BeamSqlPrimitive.of(SqlTypeName.INTEGER, -1)));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testLessThanEqual(){
+    BeamSqlLessThanEqualExpression exp1 = new BeamSqlLessThanEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
+            BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.9)));
+    Assert.assertEquals(true, exp1.evaluate(record).getValue());
+
+    BeamSqlLessThanEqualExpression exp2 = new BeamSqlLessThanEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2),
+            BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 8.0)));
+    Assert.assertEquals(false, exp2.evaluate(record).getValue());
+  }
+
+  @Test
+  public void testNotEqual(){
+    BeamSqlNotEqualExpression exp1 = new BeamSqlNotEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 1234567L)));
+    Assert.assertEquals(false, exp1.evaluate(record).getValue());
+
+    BeamSqlNotEqualExpression exp2 = new BeamSqlNotEqualExpression(
+        Arrays.asList(new BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3),
+            BeamSqlPrimitive.of(SqlTypeName.BIGINT, 0L)));
+    Assert.assertEquals(true, exp2.evaluate(record).getValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
new file mode 100644
index 0000000..1cadeb0
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlInputRefExpression}.
+ */
+public class BeamSqlInputRefExpressionTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testRefInRange() {
+    BeamSqlInputRefExpression ref0 = new 
BeamSqlInputRefExpression(SqlTypeName.BIGINT, 0);
+    Assert.assertEquals(record.getLong(0), ref0.evaluate(record).getValue());
+
+    BeamSqlInputRefExpression ref1 = new 
BeamSqlInputRefExpression(SqlTypeName.INTEGER, 1);
+    Assert.assertEquals(record.getInteger(1), 
ref1.evaluate(record).getValue());
+
+    BeamSqlInputRefExpression ref2 = new 
BeamSqlInputRefExpression(SqlTypeName.DOUBLE, 2);
+    Assert.assertEquals(record.getDouble(2), ref2.evaluate(record).getValue());
+
+    BeamSqlInputRefExpression ref3 = new 
BeamSqlInputRefExpression(SqlTypeName.BIGINT, 3);
+    Assert.assertEquals(record.getLong(3), ref3.evaluate(record).getValue());
+  }
+
+
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testRefOutOfRange(){
+    BeamSqlInputRefExpression ref = new 
BeamSqlInputRefExpression(SqlTypeName.BIGINT, 4);
+    ref.evaluate(record).getValue();
+  }
+
+  @Test(expected = BeamInvalidOperatorException.class)
+  public void testTypeUnMatch(){
+    BeamSqlInputRefExpression ref = new 
BeamSqlInputRefExpression(SqlTypeName.INTEGER, 0);
+    ref.evaluate(record).getValue();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
new file mode 100644
index 0000000..adb8de9
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
+import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for {@link BeamSqlPrimitive}.
+ *
+ */
+public class BeamSqlPrimitiveTest extends BeamSQLFnExecutorTestBase {
+
+  @Test
+  public void testPrimitiveInt(){
+    BeamSqlPrimitive<Integer> expInt = 
BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+
+  @Test(expected = BeamInvalidOperatorException.class)
+  public void testPrimitiveTypeUnMatch1(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.INTEGER, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+  @Test(expected = BeamInvalidOperatorException.class)
+  public void testPrimitiveTypeUnMatch2(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DECIMAL, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+  @Test(expected = BeamInvalidOperatorException.class)
+  public void testPrimitiveTypeUnMatch3(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.FLOAT, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+  @Test(expected = BeamInvalidOperatorException.class)
+  public void testPrimitiveTypeUnMatch4(){
+    BeamSqlPrimitive expInt = BeamSqlPrimitive.of(SqlTypeName.DOUBLE, 100L);
+    Assert.assertEquals(expInt.getValue(), expInt.evaluate(record).getValue());
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 733b056..625fb71 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 /**
@@ -35,21 +36,33 @@ import org.junit.BeforeClass;
  *
  */
 public class BasePlanner {
-  public static BeamSqlRunner runner = new BeamSqlRunner();
+  public static BeamSqlRunner runner;
 
   @BeforeClass
   public static void prepare() {
+    runner = new BeamSqlRunner();
+
     runner.addTable("ORDER_DETAILS", getTable());
     runner.addTable("SUB_ORDER", getTable("127.0.0.1:9092", "sub_orders"));
     runner.addTable("SUB_ORDER_RAM", getTable());
   }
 
+  @AfterClass
+  public static void close(){
+    runner = null;
+  }
+
   private static BaseBeamTable getTable() {
     final RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", 
SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", 
SqlTypeName.TIMESTAMP).build();
+        return a0.builder()
+            .add("order_id", SqlTypeName.BIGINT)
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE)
+            .add("shipping", SqlTypeName.FLOAT)
+            .add("notes", SqlTypeName.VARCHAR)
+            .build();
       }
     };
 
@@ -60,8 +73,13 @@ public class BasePlanner {
     final RelProtoDataType protoRowType = new RelProtoDataType() {
       @Override
       public RelDataType apply(RelDataTypeFactory a0) {
-        return a0.builder().add("order_id", SqlTypeName.BIGINT).add("site_id", 
SqlTypeName.INTEGER)
-            .add("price", SqlTypeName.DOUBLE).add("order_time", 
SqlTypeName.TIMESTAMP).build();
+        return a0.builder()
+            .add("order_id", SqlTypeName.BIGINT)
+            .add("site_id", SqlTypeName.INTEGER)
+            .add("price", SqlTypeName.DOUBLE)
+            .add("shipping", SqlTypeName.FLOAT)
+            .add("notes", SqlTypeName.VARCHAR)
+            .build();
       }
     };
 

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
index 9dde0f1..5d1052b 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerExplainTest.java
@@ -32,7 +32,7 @@ public class BeamPlannerExplainTest extends BasePlanner {
     String plan = runner.explainQuery(sql);
 
     String expectedPlan =
-        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[$3])\n"
+        "BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
shipping=[$3], notes=[$4])\n"
         + "  BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";
     Assert.assertEquals("explain doesn't match", expectedPlan, plan);
   }
@@ -58,7 +58,8 @@ public class BeamPlannerExplainTest extends BasePlanner {
 
     String expectedPlan =
         "BeamIOSinkRel(table=[[SUB_ORDER]], operation=[INSERT], 
flattened=[true])\n"
-        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
order_time=[null])\n"
+        + "  BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2], 
shipping=[null],"
+            + " notes=[null])\n"
         + "    BeamProjectRel(order_id=[$0], site_id=[$1], price=[$2])\n"
         + "      BeamFilterRel(condition=[AND(=($1, 0), >($2, 20))])\n"
         + "        BeamIOSourceRel(table=[[ORDER_DETAILS]])\n";

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
index d32b19b..1ca9eb3 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerSubmitTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.dsls.sql.planner;
 
-import org.apache.beam.sdk.Pipeline;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -30,13 +29,11 @@ public class BeamPlannerSubmitTest extends BasePlanner {
   public void insertSelectFilter() throws Exception {
     String sql = "INSERT INTO SUB_ORDER_RAM(order_id, site_id, price) SELECT "
         + " order_id, site_id, price " + "FROM ORDER_DETAILS " + "WHERE 
SITE_ID = 0 and price > 20";
-    Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql);
-    runner.getPlanner().planner.close();
 
-    pipeline.run().waitUntilFinish();
+    runner.submitQuery(sql);
 
     Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
-    Assert.assertEquals("order_id=12345,site_id=0,price=20.5,order_time=null",
+    
Assert.assertEquals("order_id=12345,site_id=0,price=20.5,shipping=null,notes=null",
         MockedBeamSQLTable.CONTENT.get(0));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/464cc275/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
index 8631a6e..538607f 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/MockedBeamSQLTable.java
@@ -18,7 +18,6 @@
 package org.apache.beam.dsls.sql.planner;
 
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
 
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
@@ -39,11 +38,6 @@ import org.apache.calcite.rel.type.RelProtoDataType;
  */
 public class MockedBeamSQLTable extends BaseBeamTable {
 
-  /**
-   *
-   */
-  private static final long serialVersionUID = 1373168368414036932L;
-
   public static final List<String> CONTENT = new ArrayList<>();
 
   public MockedBeamSQLTable(RelProtoDataType protoRowType) {
@@ -61,25 +55,25 @@ public class MockedBeamSQLTable extends BaseBeamTable {
     row1.addField(0, 12345L);
     row1.addField(1, 0);
     row1.addField(2, 10.5);
-    row1.addField(3, new Date());
+    row1.addField(3, 123.4f);
 
     BeamSQLRow row2 = new BeamSQLRow(beamSqlRecordType);
     row2.addField(0, 12345L);
     row2.addField(1, 1);
     row2.addField(2, 20.5);
-    row2.addField(3, new Date());
+    row2.addField(3, 234.5f);
 
     BeamSQLRow row3 = new BeamSQLRow(beamSqlRecordType);
     row3.addField(0, 12345L);
     row3.addField(1, 0);
     row3.addField(2, 20.5);
-    row3.addField(3, new Date());
+    row3.addField(3, 345.6f);
 
     BeamSQLRow row4 = new BeamSQLRow(beamSqlRecordType);
     row4.addField(0, null);
     row4.addField(1, null);
     row4.addField(2, 20.5);
-    row4.addField(3, new Date());
+    row4.addField(3, 456.7f);
 
     return Create.of(row1, row2, row3);
   }

Reply via email to