http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
deleted file mode 100644
index 2ab6301..0000000
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLFilterFn.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * {@code BeamSQLFilterFn} is the executor for a {@link BeamFilterRel} step.
- *
- */
-public class BeamSQLFilterFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-
-  private String stepName;
-  private BeamSQLExpressionExecutor executor;
-
-  public BeamSQLFilterFn(String stepName, BeamSQLExpressionExecutor executor) {
-    super();
-    this.stepName = stepName;
-    this.executor = executor;
-  }
-
-  @Setup
-  public void setup() {
-    executor.prepare();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    BeamSQLRow in = c.element();
-
-    List<Object> result = executor.execute(in);
-
-    if ((Boolean) result.get(0)) {
-      c.output(in);
-    }
-  }
-
-  @Teardown
-  public void close() {
-    executor.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
deleted file mode 100644
index c146ea5..0000000
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLOutputToConsoleFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * A test PTransform to display output in console.
- *
- */
-public class BeamSQLOutputToConsoleFn extends DoFn<BeamSQLRow, Void> {
-
-  private String stepName;
-
-  public BeamSQLOutputToConsoleFn(String stepName) {
-    super();
-    this.stepName = stepName;
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c) {
-    System.out.println("Output: " + c.element().getDataValues());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
deleted file mode 100644
index ef4dc0f..0000000
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSQLProjectFn.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.transform;
-
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.beam.dsls.sql.schema.BeamTableUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-
-/**
- *
- * {@code BeamSQLProjectFn} is the executor for a {@link BeamProjectRel} step.
- *
- */
-public class BeamSQLProjectFn extends DoFn<BeamSQLRow, BeamSQLRow> {
-  private String stepName;
-  private BeamSQLExpressionExecutor executor;
-  private BeamSQLRecordType outputRecordType;
-
-  public BeamSQLProjectFn(String stepName, BeamSQLExpressionExecutor executor,
-      BeamSQLRecordType outputRecordType) {
-    super();
-    this.stepName = stepName;
-    this.executor = executor;
-    this.outputRecordType = outputRecordType;
-  }
-
-  @Setup
-  public void setup() {
-    executor.prepare();
-  }
-
-  @ProcessElement
-  public void processElement(ProcessContext c, BoundedWindow window) {
-    BeamSQLRow inputRecord = c.element();
-    List<Object> results = executor.execute(inputRecord);
-
-    BeamSQLRow outRow = new BeamSQLRow(outputRecordType);
-    outRow.updateWindowRange(inputRecord, window);
-
-    for (int idx = 0; idx < results.size(); ++idx) {
-      BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, 
results.get(idx));
-    }
-
-    c.output(outRow);
-  }
-
-  @Teardown
-  public void close() {
-    executor.close();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
new file mode 100644
index 0000000..d4dbc6a
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlFilterFn.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * {@code BeamSqlFilterFn} is the executor for a {@link BeamFilterRel} step.
+ *
+ */
+public class BeamSqlFilterFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+
+  private String stepName;
+  private BeamSqlExpressionExecutor executor;
+
+  public BeamSqlFilterFn(String stepName, BeamSqlExpressionExecutor executor) {
+    super();
+    this.stepName = stepName;
+    this.executor = executor;
+  }
+
+  @Setup
+  public void setup() {
+    executor.prepare();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    BeamSqlRow in = c.element();
+
+    List<Object> result = executor.execute(in);
+
+    if ((Boolean) result.get(0)) {
+      c.output(in);
+    }
+  }
+
+  @Teardown
+  public void close() {
+    executor.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
new file mode 100644
index 0000000..d8a2a63
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlOutputToConsoleFn.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A test PTransform to display output in console.
+ *
+ */
+public class BeamSqlOutputToConsoleFn extends DoFn<BeamSqlRow, Void> {
+
+  private String stepName;
+
+  public BeamSqlOutputToConsoleFn(String stepName) {
+    super();
+    this.stepName = stepName;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) {
+    System.out.println("Output: " + c.element().getDataValues());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
new file mode 100644
index 0000000..2a3357c
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamSqlProjectFn.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.transform;
+
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.beam.dsls.sql.schema.BeamTableUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+
+/**
+ *
+ * {@code BeamSqlProjectFn} is the executor for a {@link BeamProjectRel} step.
+ *
+ */
+public class BeamSqlProjectFn extends DoFn<BeamSqlRow, BeamSqlRow> {
+  private String stepName;
+  private BeamSqlExpressionExecutor executor;
+  private BeamSqlRecordType outputRecordType;
+
+  public BeamSqlProjectFn(String stepName, BeamSqlExpressionExecutor executor,
+      BeamSqlRecordType outputRecordType) {
+    super();
+    this.stepName = stepName;
+    this.executor = executor;
+    this.outputRecordType = outputRecordType;
+  }
+
+  @Setup
+  public void setup() {
+    executor.prepare();
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c, BoundedWindow window) {
+    BeamSqlRow inputRecord = c.element();
+    List<Object> results = executor.execute(inputRecord);
+
+    BeamSqlRow outRow = new BeamSqlRow(outputRecordType);
+    outRow.updateWindowRange(inputRecord, window);
+
+    for (int idx = 0; idx < results.size(); ++idx) {
+      BeamTableUtils.addFieldWithAutoTypeCasting(outRow, idx, 
results.get(idx));
+    }
+
+    c.output(outRow);
+  }
+
+  @Teardown
+  public void close() {
+    executor.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
index cd2bdeb..5169749 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/package-info.java
@@ -17,6 +17,6 @@
  */
 
 /**
- * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSQL 
pipeline.
+ * {@link org.apache.beam.sdk.transforms.PTransform} used in a BeamSql 
pipeline.
  */
 package org.apache.beam.dsls.sql.transform;

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
deleted file mode 100644
index ba9f525..0000000
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import static org.junit.Assert.assertTrue;
-
-import java.math.BigDecimal;
-import java.util.Arrays;
-
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
-import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
-import org.apache.beam.dsls.sql.rel.BeamFilterRel;
-import org.apache.beam.dsls.sql.rel.BeamProjectRel;
-import org.apache.beam.dsls.sql.rel.BeamRelNode;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit test cases for {@link BeamSQLFnExecutor}.
- */
-public class BeamSQLFnExecutorTest extends BeamSQLFnExecutorTestBase {
-
-  @Test
-  public void testBeamFilterRel() {
-    RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
-        Arrays.asList(
-            rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
-                Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
-                    rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
-            rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
-                Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
-                    rexBuilder.makeExactLiteral(new BigDecimal(0))))));
-
-    BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, 
RelTraitSet.createEmpty(), null,
-        condition);
-
-    BeamSQLFnExecutor executor = new BeamSQLFnExecutor(beamFilterRel);
-    executor.prepare();
-
-    Assert.assertEquals(1, executor.exps.size());
-
-    BeamSqlExpression l1Exp = executor.exps.get(0);
-    assertTrue(l1Exp instanceof BeamSqlAndExpression);
-    Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
-
-    Assert.assertEquals(2, l1Exp.getOperands().size());
-    BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
-    BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
-
-    assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression);
-    assertTrue(l1Right instanceof BeamSqlEqualExpression);
-
-    Assert.assertEquals(2, l1Left.getOperands().size());
-    BeamSqlExpression l1LeftLeft = (BeamSqlExpression) 
l1Left.getOperands().get(0);
-    BeamSqlExpression l1LeftRight = (BeamSqlExpression) 
l1Left.getOperands().get(1);
-    assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
-    assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
-
-    Assert.assertEquals(2, l1Right.getOperands().size());
-    BeamSqlExpression l1RightLeft = (BeamSqlExpression) 
l1Right.getOperands().get(0);
-    BeamSqlExpression l1RightRight = (BeamSqlExpression) 
l1Right.getOperands().get(1);
-    assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
-    assertTrue(l1RightRight instanceof BeamSqlPrimitive);
-  }
-
-  @Test
-  public void testBeamProjectRel() {
-    BeamRelNode relNode = new BeamProjectRel(cluster, 
RelTraitSet.createEmpty(),
-        relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
-        rexBuilder.identityProjects(relDataType), relDataType);
-    BeamSQLFnExecutor executor = new BeamSQLFnExecutor(relNode);
-
-    executor.prepare();
-    Assert.assertEquals(4, executor.exps.size());
-    assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
-    assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
-    assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
-    assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
-  }
-
-
-  @Test
-  public void testBuildExpression_arithmetic() {
-    testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, 
BeamSqlPlusExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, 
BeamSqlMinusExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, 
BeamSqlMultiplyExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, 
BeamSqlDivideExpression.class);
-    testBuildArithmeticExpression(SqlStdOperatorTable.MOD, 
BeamSqlModExpression.class);
-  }
-
-  private void testBuildArithmeticExpression(SqlOperator fn,
-      Class<? extends BeamSqlExpression> clazz) {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(fn, Arrays.asList(
-        rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
-        rexBuilder.makeBigintLiteral(new BigDecimal(1L))
-    ));
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-
-    assertTrue(exp.getClass().equals(clazz));
-  }
-
-  public void testBuildExpression_string()  {
-    RexNode rexNode;
-    BeamSqlExpression exp;
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello "),
-            rexBuilder.makeLiteral("world")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlConcatExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello"),
-            rexBuilder.makeLiteral("worldhello")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlPositionExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello"),
-            rexBuilder.makeLiteral("worldhello"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlPositionExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCharLengthExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlUpperExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlLowerExpression);
-
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
-        Arrays.asList(
-            rexBuilder.makeLiteral("hello")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlInitCapExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
-        Arrays.asList(
-            rexBuilder.makeLiteral("BOTH"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlTrimExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlSubstringExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlSubstringExpression);
-
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOverlayExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
-        Arrays.asList(
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
-            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlOverlayExpression);
-
-    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
-        Arrays.asList(
-            rexBuilder.makeLiteral(true),
-            rexBuilder.makeLiteral("HELLO"),
-            rexBuilder.makeLiteral("HELLO")
-        )
-    );
-    exp = BeamSQLFnExecutor.buildExpression(rexNode);
-    assertTrue(exp instanceof BeamSqlCaseExpression);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
deleted file mode 100644
index bfc7366..0000000
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutorTestBase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.interpreter;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
-import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
-import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
-import org.apache.beam.dsls.sql.planner.BeamRuleSets;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
-import org.apache.calcite.plan.Contexts;
-import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.volcano.VolcanoPlanner;
-import org.apache.calcite.rel.RelCollationTraitDef;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeSystem;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.parser.SqlParser;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.RelBuilder;
-import org.junit.BeforeClass;
-
-/**
- * base class to test {@link BeamSQLFnExecutor} and subclasses of {@link 
BeamSqlExpression}.
- */
-public class BeamSQLFnExecutorTestBase {
-  public static RexBuilder rexBuilder = new 
RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
-  public static RelOptCluster cluster = RelOptCluster.create(new 
VolcanoPlanner(), rexBuilder);
-
-  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
-      RelDataTypeSystem.DEFAULT);
-  public static RelDataType relDataType;
-
-  public static BeamSQLRecordType beamRecordType;
-  public static BeamSQLRow record;
-
-  public static RelBuilder relBuilder;
-
-  @BeforeClass
-  public static void prepare() {
-    relDataType = TYPE_FACTORY.builder()
-        .add("order_id", SqlTypeName.BIGINT)
-        .add("site_id", SqlTypeName.INTEGER)
-        .add("price", SqlTypeName.DOUBLE)
-        .add("order_time", SqlTypeName.BIGINT).build();
-
-    beamRecordType = BeamSQLRecordType.from(relDataType);
-    record = new BeamSQLRow(beamRecordType);
-
-    record.addField(0, 1234567L);
-    record.addField(1, 0);
-    record.addField(2, 8.9);
-    record.addField(3, 1234567L);
-
-    SchemaPlus schema = Frameworks.createRootSchema(true);
-    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
-    traitDefs.add(ConventionTraitDef.INSTANCE);
-    traitDefs.add(RelCollationTraitDef.INSTANCE);
-    FrameworkConfig config = Frameworks.newConfigBuilder()
-        
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
-        
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
-        
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
-
-    relBuilder = RelBuilder.create(config);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
new file mode 100644
index 0000000..017c6ca
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanEqualExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlDivideExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
+import org.apache.beam.dsls.sql.rel.BeamFilterRel;
+import org.apache.beam.dsls.sql.rel.BeamProjectRel;
+import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test cases for {@link BeamSqlFnExecutor}.
+ */
+public class BeamSqlFnExecutorTest extends BeamSqlFnExecutorTestBase {
+
+  @Test
+  public void testBeamFilterRel() {
+    RexNode condition = rexBuilder.makeCall(SqlStdOperatorTable.AND,
+        Arrays.asList(
+            rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 0),
+                    rexBuilder.makeBigintLiteral(new BigDecimal(1000L)))),
+            rexBuilder.makeCall(SqlStdOperatorTable.EQUALS,
+                Arrays.asList(rexBuilder.makeInputRef(relDataType, 1),
+                    rexBuilder.makeExactLiteral(new BigDecimal(0))))));
+
+    BeamFilterRel beamFilterRel = new BeamFilterRel(cluster, 
RelTraitSet.createEmpty(), null,
+        condition);
+
+    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(beamFilterRel);
+    executor.prepare();
+
+    Assert.assertEquals(1, executor.exps.size());
+
+    BeamSqlExpression l1Exp = executor.exps.get(0);
+    assertTrue(l1Exp instanceof BeamSqlAndExpression);
+    Assert.assertEquals(SqlTypeName.BOOLEAN, l1Exp.getOutputType());
+
+    Assert.assertEquals(2, l1Exp.getOperands().size());
+    BeamSqlExpression l1Left = (BeamSqlExpression) l1Exp.getOperands().get(0);
+    BeamSqlExpression l1Right = (BeamSqlExpression) l1Exp.getOperands().get(1);
+
+    assertTrue(l1Left instanceof BeamSqlLessThanEqualExpression);
+    assertTrue(l1Right instanceof BeamSqlEqualExpression);
+
+    Assert.assertEquals(2, l1Left.getOperands().size());
+    BeamSqlExpression l1LeftLeft = (BeamSqlExpression) 
l1Left.getOperands().get(0);
+    BeamSqlExpression l1LeftRight = (BeamSqlExpression) 
l1Left.getOperands().get(1);
+    assertTrue(l1LeftLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1LeftRight instanceof BeamSqlPrimitive);
+
+    Assert.assertEquals(2, l1Right.getOperands().size());
+    BeamSqlExpression l1RightLeft = (BeamSqlExpression) 
l1Right.getOperands().get(0);
+    BeamSqlExpression l1RightRight = (BeamSqlExpression) 
l1Right.getOperands().get(1);
+    assertTrue(l1RightLeft instanceof BeamSqlInputRefExpression);
+    assertTrue(l1RightRight instanceof BeamSqlPrimitive);
+  }
+
+  @Test
+  public void testBeamProjectRel() {
+    BeamRelNode relNode = new BeamProjectRel(cluster, 
RelTraitSet.createEmpty(),
+        relBuilder.values(relDataType, 1234567L, 0, 8.9, null).build(),
+        rexBuilder.identityProjects(relDataType), relDataType);
+    BeamSqlFnExecutor executor = new BeamSqlFnExecutor(relNode);
+
+    executor.prepare();
+    Assert.assertEquals(4, executor.exps.size());
+    assertTrue(executor.exps.get(0) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(1) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(2) instanceof BeamSqlInputRefExpression);
+    assertTrue(executor.exps.get(3) instanceof BeamSqlInputRefExpression);
+  }
+
+
+  @Test
+  public void testBuildExpression_arithmetic() {
+    testBuildArithmeticExpression(SqlStdOperatorTable.PLUS, 
BeamSqlPlusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MINUS, 
BeamSqlMinusExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MULTIPLY, 
BeamSqlMultiplyExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.DIVIDE, 
BeamSqlDivideExpression.class);
+    testBuildArithmeticExpression(SqlStdOperatorTable.MOD, 
BeamSqlModExpression.class);
+  }
+
+  private void testBuildArithmeticExpression(SqlOperator fn,
+      Class<? extends BeamSqlExpression> clazz) {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(fn, Arrays.asList(
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L)),
+        rexBuilder.makeBigintLiteral(new BigDecimal(1L))
+    ));
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+
+    assertTrue(exp.getClass().equals(clazz));
+  }
+
+  public void testBuildExpression_string()  {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CONCAT,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello "),
+            rexBuilder.makeLiteral("world")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlConcatExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.POSITION,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello"),
+            rexBuilder.makeLiteral("worldhello"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlPositionExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCharLengthExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.UPPER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlUpperExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOWER,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlLowerExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.INITCAP,
+        Arrays.asList(
+            rexBuilder.makeLiteral("hello")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlInitCapExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.TRIM,
+        Arrays.asList(
+            rexBuilder.makeLiteral("BOTH"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlTrimExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.SUBSTRING,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlSubstringExpression);
+
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.OVERLAY,
+        Arrays.asList(
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO),
+            rexBuilder.makeBigintLiteral(BigDecimal.ZERO)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlOverlayExpression);
+
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CASE,
+        Arrays.asList(
+            rexBuilder.makeLiteral(true),
+            rexBuilder.makeLiteral("HELLO"),
+            rexBuilder.makeLiteral("HELLO")
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCaseExpression);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
new file mode 100644
index 0000000..d83ca8f
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTestBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.interpreter;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
+import org.apache.beam.dsls.sql.planner.BeamRelDataTypeSystem;
+import org.apache.beam.dsls.sql.planner.BeamRuleSets;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.plan.Contexts;
+import org.apache.calcite.plan.ConventionTraitDef;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitDef;
+import org.apache.calcite.plan.volcano.VolcanoPlanner;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.RelBuilder;
+import org.junit.BeforeClass;
+
+/**
+ * base class to test {@link BeamSqlFnExecutor} and subclasses of {@link 
BeamSqlExpression}.
+ */
+public class BeamSqlFnExecutorTestBase {
+  public static RexBuilder rexBuilder = new 
RexBuilder(BeamQueryPlanner.TYPE_FACTORY);
+  public static RelOptCluster cluster = RelOptCluster.create(new 
VolcanoPlanner(), rexBuilder);
+
+  public static final JavaTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl(
+      RelDataTypeSystem.DEFAULT);
+  public static RelDataType relDataType;
+
+  public static BeamSqlRecordType beamRecordType;
+  public static BeamSqlRow record;
+
+  public static RelBuilder relBuilder;
+
+  @BeforeClass
+  public static void prepare() {
+    relDataType = TYPE_FACTORY.builder()
+        .add("order_id", SqlTypeName.BIGINT)
+        .add("site_id", SqlTypeName.INTEGER)
+        .add("price", SqlTypeName.DOUBLE)
+        .add("order_time", SqlTypeName.BIGINT).build();
+
+    beamRecordType = BeamSqlRecordType.from(relDataType);
+    record = new BeamSqlRow(beamRecordType);
+
+    record.addField(0, 1234567L);
+    record.addField(1, 0);
+    record.addField(2, 8.9);
+    record.addField(3, 1234567L);
+
+    SchemaPlus schema = Frameworks.createRootSchema(true);
+    final List<RelTraitDef> traitDefs = new ArrayList<RelTraitDef>();
+    traitDefs.add(ConventionTraitDef.INSTANCE);
+    traitDefs.add(RelCollationTraitDef.INSTANCE);
+    FrameworkConfig config = Frameworks.newConfigBuilder()
+        
.parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema)
+        
.traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets())
+        
.costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build();
+
+    relBuilder = RelBuilder.create(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
index a328c88..b0cc84d 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamNullExperssionTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.dsls.sql.interpreter.operator;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -26,7 +26,7 @@ import org.junit.Test;
  * Test cases for {@link BeamSqlIsNullExpression} and
  * {@link BeamSqlIsNotNullExpression}.
  */
-public class BeamNullExperssionTest extends BeamSQLFnExecutorTestBase {
+public class BeamNullExperssionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testIsNull() {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
index 9dabcdc..9c9d3d2 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlAndOrExpressionTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,7 +27,7 @@ import org.junit.Test;
 /**
  * Test cases for {@link BeamSqlAndExpression}, {@link BeamSqlOrExpression}.
  */
-public class BeamSqlAndOrExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlAndOrExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testAnd() {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
index 06b5073..39eec76 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCaseExpressionTest.java
@@ -25,14 +25,14 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Test;
 
 /**
  * Test for BeamSqlCaseExpression.
  */
-public class BeamSqlCaseExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlCaseExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void accept() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
index b88de71..c76fa1c 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlCompareExpressionTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.Arrays;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -26,7 +26,7 @@ import org.junit.Test;
 /**
  * Test cases for the collections of {@link BeamSqlCompareExpression}.
  */
-public class BeamSqlCompareExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlCompareExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testEqual() {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
index 1cadeb0..8c19283 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlInputRefExpressionTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -26,7 +26,7 @@ import org.junit.Test;
 /**
  * Test cases for {@link BeamSqlInputRefExpression}.
  */
-public class BeamSqlInputRefExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlInputRefExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testRefInRange() {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
index adb8de9..7cdc44e 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitiveTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter.operator;
 
 import org.apache.beam.dsls.sql.exception.BeamInvalidOperatorException;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,7 +27,7 @@ import org.junit.Test;
  * Test cases for {@link BeamSqlPrimitive}.
  *
  */
-public class BeamSqlPrimitiveTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlPrimitiveTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testPrimitiveInt(){

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
index 71ac523..e1660b4 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.dsls.sql.interpreter.operator;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,7 +27,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlUdfExpression.
  */
-public class BeamSqlUdfExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlUdfExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test
   public void testUdf() throws NoSuchMethodException, SecurityException {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
index abebf17..fc28180 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/arithmetic/BeamSqlArithmeticExpressionTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -34,7 +34,7 @@ import org.junit.Test;
 /**
  * Tests for {@code BeamSqlArithmeticExpression}.
  */
-public class BeamSqlArithmeticExpressionTest extends BeamSQLFnExecutorTestBase 
{
+public class BeamSqlArithmeticExpressionTest extends BeamSqlFnExecutorTestBase 
{
 
   @Test public void testAccept_normal() {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
index c5753d3..e3b0d18 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/math/BeamSqlMathUnaryExpressionTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.interpreter.operator.math;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -31,7 +31,7 @@ import org.junit.Test;
 /**
  * Test for {@link BeamSqlMathUnaryExpression}.
  */
-public class BeamSqlMathUnaryExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlMathUnaryExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void testForGreaterThanOneOperands() {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
index cd02fdf..b749099 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlCharLengthExpressionTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,7 +32,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlCharLengthExpression.
  */
-public class BeamSqlCharLengthExpressionTest extends BeamSQLFnExecutorTestBase 
{
+public class BeamSqlCharLengthExpressionTest extends BeamSqlFnExecutorTestBase 
{
 
   @Test public void evaluate() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
index ca71dec..c77e1e6 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlConcatExpressionTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -34,7 +34,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlConcatExpression.
  */
-public class BeamSqlConcatExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlConcatExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void accept() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
index b38b033..557f235 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlInitCapExpressionTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,7 +32,7 @@ import org.junit.Test;
 /**
  * Test of BeamSqlInitCapExpression.
  */
-public class BeamSqlInitCapExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlInitCapExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void evaluate() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
index fead9dc..9abbfd8 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlLowerExpressionTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,7 +32,7 @@ import org.junit.Test;
 /**
  * Test of BeamSqlLowerExpression.
  */
-public class BeamSqlLowerExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlLowerExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void evaluate() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
index 3c4bca5..e98fd62 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlOverlayExpressionTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -33,7 +33,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlOverlayExpression.
  */
-public class BeamSqlOverlayExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlOverlayExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void accept() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
index 7339466..4627610 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlPositionExpressionTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -34,7 +34,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlPositionExpression.
  */
-public class BeamSqlPositionExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlPositionExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void accept() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
index 78b2731..8d54522 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlSubstringExpressionTest.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -33,7 +33,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlSubstringExpression.
  */
-public class BeamSqlSubstringExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlSubstringExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void accept() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
index 8ad33c9..8c595f3 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlTrimExpressionTest.java
@@ -25,7 +25,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -34,7 +34,7 @@ import org.junit.Test;
 /**
  * Test for BeamSqlTrimExpression.
  */
-public class BeamSqlTrimExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlTrimExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void accept() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
index e6f3500..1a734bc 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/string/BeamSqlUpperExpressionTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
 import org.apache.calcite.sql.type.SqlTypeName;
@@ -32,7 +32,7 @@ import org.junit.Test;
 /**
  * Test of BeamSqlUpperExpression.
  */
-public class BeamSqlUpperExpressionTest extends BeamSQLFnExecutorTestBase {
+public class BeamSqlUpperExpressionTest extends BeamSqlFnExecutorTestBase {
 
   @Test public void evaluate() throws Exception {
     List<BeamSqlExpression> operands = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
index 03f7705..7f69345 100644
--- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
+++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BasePlanner.java
@@ -23,8 +23,8 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.kafka.BeamKafkaCSVTable;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -54,33 +54,33 @@ public class BasePlanner {
       }
     };
 
-    BeamSQLRecordType dataType = BeamSQLRecordType.from(
+    BeamSqlRecordType dataType = BeamSqlRecordType.from(
         protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-    BeamSQLRow row1 = new BeamSQLRow(dataType);
+    BeamSqlRow row1 = new BeamSqlRow(dataType);
     row1.addField(0, 12345L);
     row1.addField(1, 0);
     row1.addField(2, 10.5);
     row1.addField(3, new Date());
 
-    BeamSQLRow row2 = new BeamSQLRow(dataType);
+    BeamSqlRow row2 = new BeamSqlRow(dataType);
     row2.addField(0, 12345L);
     row2.addField(1, 1);
     row2.addField(2, 20.5);
     row2.addField(3, new Date());
 
-    BeamSQLRow row3 = new BeamSQLRow(dataType);
+    BeamSqlRow row3 = new BeamSqlRow(dataType);
     row3.addField(0, 12345L);
     row3.addField(1, 0);
     row3.addField(2, 20.5);
     row3.addField(3, new Date());
 
-    BeamSQLRow row4 = new BeamSQLRow(dataType);
+    BeamSqlRow row4 = new BeamSqlRow(dataType);
     row4.addField(0, null);
     row4.addField(1, null);
     row4.addField(2, 20.5);
     row4.addField(3, new Date());
 
-    return new MockedBeamSQLTable(protoRowType).withInputRecords(
+    return new MockedBeamSqlTable(protoRowType).withInputRecords(
         Arrays.asList(row1, row2, row3, row4));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
index 0436ca1..8db65d1 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java
@@ -20,7 +20,7 @@ package org.apache.beam.dsls.sql.planner;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.junit.Test;
@@ -39,7 +39,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
   public void testSimpleGroupExplain() throws Exception {
     String sql = "SELECT COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 ";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   /**
@@ -49,7 +49,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
   public void testSimpleGroup2Explain() throws Exception {
     String sql = "SELECT site_id" + ", COUNT(*) " + "FROM ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY site_id";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   /**
@@ -60,7 +60,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", TUMBLE(order_time, INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   /**
@@ -72,7 +72,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
         + "TUMBLE_START(order_time, INTERVAL '1' HOUR, TIME '00:00:01')"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' 
HOUR, TIME '00:00:01')";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   /**
@@ -83,7 +83,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", HOP(order_time, INTERVAL '5' MINUTE, INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   /**
@@ -94,7 +94,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id, site_id"
         + ", SESSION(order_time, INTERVAL '5' MINUTE)";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   /**
@@ -105,7 +105,7 @@ public class BeamGroupByPipelineTest extends BasePlanner {
     BeamSqlEnv.registerUdf("negative", BeamSqlUdfExpressionTest.UdfFn.class, 
"negative");
     String sql = "select site_id, negative(site_id) as nsite_id from 
ORDER_DETAILS";
 
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
index 946a9fd..adb454c 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamInvalidGroupByTest.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.planner;
 
 import org.apache.beam.dsls.sql.BeamSqlCli;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.calcite.tools.ValidationException;
@@ -37,7 +37,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
   public void testTumble2Explain() throws Exception {
     String sql = "SELECT order_id, site_id" + ", COUNT(*) AS `SIZE`" + "FROM 
ORDER_DETAILS "
         + "WHERE SITE_ID = 0 " + "GROUP BY order_id" + ", TUMBLE(order_time, 
INTERVAL '1' HOUR)";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
   @Test(expected = ValidationException.class)
@@ -45,7 +45,7 @@ public class BeamInvalidGroupByTest extends BasePlanner {
     String sql = "SELECT order_id, site_id, TUMBLE(order_time, INTERVAL '1' 
HOUR)"
         + ", COUNT(*) AS `SIZE`" + "FROM ORDER_DETAILS " + "WHERE SITE_ID = 0 "
         + "GROUP BY order_id, site_id" + ", TUMBLE(order_time, INTERVAL '1' 
HOUR)";
-    PCollection<BeamSQLRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
+    PCollection<BeamSqlRow> outputStream = BeamSqlCli.compilePipeline(sql, 
pipeline);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
index a296eec..e12eca2 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamPlannerAggregationSubmitTest.java
@@ -25,8 +25,8 @@ import java.util.Arrays;
 import org.apache.beam.dsls.sql.BeamSqlCli;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -56,7 +56,7 @@ public class BeamPlannerAggregationSubmitTest {
 
   @Before
   public void prepare() throws ParseException {
-    MockedBeamSQLTable.CONTENT.clear();
+    MockedBeamSqlTable.CONTENT.clear();
   }
 
   private static BaseBeamTable getOrderTable() throws ParseException {
@@ -69,29 +69,29 @@ public class BeamPlannerAggregationSubmitTest {
       }
     };
 
-    BeamSQLRecordType dataType = BeamSQLRecordType.from(
+    BeamSqlRecordType dataType = BeamSqlRecordType.from(
         protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY));
-    BeamSQLRow row1 = new BeamSQLRow(dataType);
+    BeamSqlRow row1 = new BeamSqlRow(dataType);
     row1.addField(0, 12345L);
     row1.addField(1, 1);
     row1.addField(2, format.parse("2017-01-01 01:02:03"));
 
-    BeamSQLRow row2 = new BeamSQLRow(dataType);
+    BeamSqlRow row2 = new BeamSqlRow(dataType);
     row2.addField(0, 12345L);
     row2.addField(1, 0);
     row2.addField(2, format.parse("2017-01-01 01:03:04"));
 
-    BeamSQLRow row3 = new BeamSQLRow(dataType);
+    BeamSqlRow row3 = new BeamSqlRow(dataType);
     row3.addField(0, 12345L);
     row3.addField(1, 0);
     row3.addField(2, format.parse("2017-01-01 02:03:04"));
 
-    BeamSQLRow row4 = new BeamSQLRow(dataType);
+    BeamSqlRow row4 = new BeamSqlRow(dataType);
     row4.addField(0, 2132L);
     row4.addField(1, 0);
     row4.addField(2, format.parse("2017-01-01 03:04:05"));
 
-    return new MockedBeamSQLTable(protoRowType).withInputRecords(
+    return new MockedBeamSqlTable(protoRowType).withInputRecords(
         Arrays.asList(row1
             , row2, row3, row4
             ));
@@ -108,7 +108,7 @@ public class BeamPlannerAggregationSubmitTest {
             .add("size", SqlTypeName.BIGINT).build();
       }
     };
-    return new MockedBeamSQLTable(protoRowType);
+    return new MockedBeamSqlTable(protoRowType);
   }
 
 
@@ -124,8 +124,8 @@ public class BeamPlannerAggregationSubmitTest {
 
     pipeline.run().waitUntilFinish();
 
-    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
-    BeamSQLRow result = MockedBeamSQLTable.CONTENT.peek();
+    Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1);
+    BeamSqlRow result = MockedBeamSqlTable.CONTENT.peek();
     Assert.assertEquals(1, result.getInteger(0));
     Assert.assertEquals(format.parse("2017-01-01 01:00:00"), 
result.getDate(1));
     Assert.assertEquals(1L, result.getLong(2));
@@ -141,8 +141,8 @@ public class BeamPlannerAggregationSubmitTest {
 
     pipeline.run().waitUntilFinish();
 
-    Assert.assertTrue(MockedBeamSQLTable.CONTENT.size() == 1);
+    Assert.assertTrue(MockedBeamSqlTable.CONTENT.size() == 1);
     Assert.assertEquals("site_id=0,agg_hour=null,size=3",
-        MockedBeamSQLTable.CONTENT.peek().valueInString());
+        MockedBeamSqlTable.CONTENT.peek().valueInString());
   }
 }

Reply via email to