http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 6914883..c0d2783 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -18,9 +18,9 @@
 package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -55,7 +55,7 @@ import org.joda.time.Duration;
  */
 public class BeamAggregationRel extends Aggregate implements BeamRelNode {
   private int windowFieldIdx = -1;
-  private WindowFn<BeamSQLRow, BoundedWindow> windowFn;
+  private WindowFn<BeamSqlRow, BoundedWindow> windowFn;
   private Trigger trigger;
   private Duration allowedLatence = Duration.ZERO;
 
@@ -71,48 +71,48 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
     RelNode input = getInput();
-    String stageName = BeamSQLRelUtils.getStageName(this);
+    String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream =
-        
BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+    PCollection<BeamSqlRow> upstream =
+        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
     if (windowFieldIdx != -1) {
       upstream = upstream.apply("assignEventTimestamp", WithTimestamps
-          .<BeamSQLRow>of(new 
BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
+          .<BeamSqlRow>of(new 
BeamAggregationTransforms.WindowTimestampFn(windowFieldIdx)))
           .setCoder(upstream.getCoder());
     }
 
-    PCollection<BeamSQLRow> windowStream = upstream.apply("window",
-        Window.<BeamSQLRow>into(windowFn)
+    PCollection<BeamSqlRow> windowStream = upstream.apply("window",
+        Window.<BeamSqlRow>into(windowFn)
         .triggering(trigger)
         .withAllowedLateness(allowedLatence)
         .accumulatingFiredPanes());
 
     BeamSqlRowCoder keyCoder = new 
BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
-    PCollection<KV<BeamSQLRow, BeamSQLRow>> exGroupByStream = 
windowStream.apply("exGroupBy",
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = 
windowStream.apply("exGroupBy",
         WithKeys
             .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
                 windowFieldIdx, groupSet)))
-        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, 
upstream.getCoder()));
+        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, 
upstream.getCoder()));
 
-    PCollection<KV<BeamSQLRow, Iterable<BeamSQLRow>>> groupedStream = 
exGroupByStream
-        .apply("groupBy", GroupByKey.<BeamSQLRow, BeamSQLRow>create())
-        .setCoder(KvCoder.<BeamSQLRow, Iterable<BeamSQLRow>>of(keyCoder,
-            IterableCoder.<BeamSQLRow>of(upstream.getCoder())));
+    PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = 
exGroupByStream
+        .apply("groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create())
+        .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
+            IterableCoder.<BeamSqlRow>of(upstream.getCoder())));
 
     BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
-    PCollection<KV<BeamSQLRow, BeamSQLRow>> aggregatedStream = 
groupedStream.apply("aggregation",
-        Combine.<BeamSQLRow, BeamSQLRow, BeamSQLRow>groupedValues(
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = 
groupedStream.apply("aggregation",
+        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
             new 
BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
-                BeamSQLRecordType.from(input.getRowType()))))
-        .setCoder(KvCoder.<BeamSQLRow, BeamSQLRow>of(keyCoder, aggCoder));
+                BeamSqlRecordType.from(input.getRowType()))))
+        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
 
-    PCollection<BeamSQLRow> mergedStream = 
aggregatedStream.apply("mergeRecord",
+    PCollection<BeamSqlRow> mergedStream = 
aggregatedStream.apply("mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
-            BeamSQLRecordType.from(getRowType()), getAggCallList())));
-    mergedStream.setCoder(new 
BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
+            BeamSqlRecordType.from(getRowType()), getAggCallList())));
+    mergedStream.setCoder(new 
BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
 
     return mergedStream;
   }
@@ -120,9 +120,9 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
   /**
    * Type of sub-rowrecord used as Group-By keys.
    */
-  private BeamSQLRecordType exKeyFieldsSchema(RelDataType relDataType) {
-    BeamSQLRecordType inputRecordType = BeamSQLRecordType.from(relDataType);
-    BeamSQLRecordType typeOfKey = new BeamSQLRecordType();
+  private BeamSqlRecordType exKeyFieldsSchema(RelDataType relDataType) {
+    BeamSqlRecordType inputRecordType = BeamSqlRecordType.from(relDataType);
+    BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
     for (int i : groupSet.asList()) {
       if (i != windowFieldIdx) {
         typeOfKey.addField(inputRecordType.getFieldsName().get(i),
@@ -135,8 +135,8 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
   /**
    * Type of sub-rowrecord, that represents the list of aggregation fields.
    */
-  private BeamSQLRecordType exAggFieldsSchema() {
-    BeamSQLRecordType typeOfAggFields = new BeamSQLRecordType();
+  private BeamSqlRecordType exAggFieldsSchema() {
+    BeamSqlRecordType typeOfAggFields = new BeamSqlRecordType();
     for (AggregateCall ac : getAggCallList()) {
       typeOfAggFields.addField(ac.name, ac.type.getSqlTypeName());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
index 3387071..4c5e113 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamFilterRel.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.dsls.sql.rel;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.transform.BeamSQLFilterFn;
+import org.apache.beam.dsls.sql.transform.BeamSqlFilterFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -50,20 +50,20 @@ public class BeamFilterRel extends Filter implements 
BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
 
     RelNode input = getInput();
-    String stageName = BeamSQLRelUtils.getStageName(this);
+    String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream =
-        
BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+    PCollection<BeamSqlRow> upstream =
+        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
 
-    BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
+    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 
-    PCollection<BeamSQLRow> filterStream = upstream.apply(stageName,
-        ParDo.of(new BeamSQLFilterFn(getRelTypeName(), executor)));
-    filterStream.setCoder(new 
BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
+    PCollection<BeamSqlRow> filterStream = upstream.apply(stageName,
+        ParDo.of(new BeamSqlFilterFn(getRelTypeName(), executor)));
+    filterStream.setCoder(new 
BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
 
     return filterStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
index f821700..76a7cb8 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSinkRel.java
@@ -20,9 +20,9 @@ package org.apache.beam.dsls.sql.rel;
 import com.google.common.base.Joiner;
 import java.util.List;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PDone;
@@ -57,14 +57,14 @@ public class BeamIOSinkRel extends TableModify implements 
BeamRelNode {
    * which is the persisted PCollection.
    */
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
 
     RelNode input = getInput();
-    String stageName = BeamSQLRelUtils.getStageName(this);
+    String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream =
-        
BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+    PCollection<BeamSqlRow> upstream =
+        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
index 38de41e..3fdeb28 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamIOSourceRel.java
@@ -19,9 +19,9 @@ package org.apache.beam.dsls.sql.rel;
 
 import com.google.common.base.Joiner;
 import org.apache.beam.dsls.sql.BeamSqlEnv;
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;
@@ -41,18 +41,18 @@ public class BeamIOSourceRel extends TableScan implements 
BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
 
     String sourceName = Joiner.on('.').join(getTable().getQualifiedName());
 
-    String stageName = BeamSQLRelUtils.getStageName(this);
+    String stageName = BeamSqlRelUtils.getStageName(this);
 
-    TupleTag<BeamSQLRow> sourceTupleTag = new TupleTag<BeamSQLRow>(sourceName);
+    TupleTag<BeamSqlRow> sourceTupleTag = new TupleTag<BeamSqlRow>(sourceName);
     if (inputPCollections.has(sourceTupleTag)) {
       //choose PCollection from input PCollectionTuple if exists there.
-      PCollection<BeamSQLRow> sourceStream = inputPCollections
-          .get(new TupleTag<BeamSQLRow>(sourceName));
+      PCollection<BeamSqlRow> sourceStream = inputPCollections
+          .get(new TupleTag<BeamSqlRow>(sourceName));
       return sourceStream;
     } else {
       //If not, the source PColection is provided with 
BaseBeamTable.buildIOReader().

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
index e2645f1..9b7492b 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamProjectRel.java
@@ -19,13 +19,13 @@ package org.apache.beam.dsls.sql.rel;
 
 import java.util.List;
 
-import org.apache.beam.dsls.sql.interpreter.BeamSQLExpressionExecutor;
-import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutor;
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlExpressionExecutor;
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutor;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
-import org.apache.beam.dsls.sql.transform.BeamSQLProjectFn;
+import org.apache.beam.dsls.sql.transform.BeamSqlProjectFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -61,19 +61,19 @@ public class BeamProjectRel extends Project implements 
BeamRelNode {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
     RelNode input = getInput();
-    String stageName = BeamSQLRelUtils.getStageName(this);
+    String stageName = BeamSqlRelUtils.getStageName(this);
 
-    PCollection<BeamSQLRow> upstream =
-        
BeamSQLRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
+    PCollection<BeamSqlRow> upstream =
+        
BeamSqlRelUtils.getBeamRelInput(input).buildBeamPipeline(inputPCollections);
 
-    BeamSQLExpressionExecutor executor = new BeamSQLFnExecutor(this);
+    BeamSqlExpressionExecutor executor = new BeamSqlFnExecutor(this);
 
-    PCollection<BeamSQLRow> projectStream = upstream.apply(stageName, ParDo
-        .of(new BeamSQLProjectFn(getRelTypeName(), executor, 
BeamSQLRecordType.from(rowType))));
-    projectStream.setCoder(new 
BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
+    PCollection<BeamSqlRow> projectStream = upstream.apply(stageName, ParDo
+        .of(new BeamSqlProjectFn(getRelTypeName(), executor, 
BeamSqlRecordType.from(rowType))));
+    projectStream.setCoder(new 
BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
 
     return projectStream;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
index ed58090..80d1f39 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamRelNode.java
@@ -18,7 +18,7 @@
 package org.apache.beam.dsls.sql.rel;
 
 import org.apache.beam.dsls.sql.planner.BeamPipelineCreator;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.calcite.rel.RelNode;
@@ -34,5 +34,5 @@ public interface BeamRelNode extends RelNode {
    * {@link BeamPipelineCreator} visits it with a DFS(Depth-First-Search)
    * algorithm.
    */
-  PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections) throws Exception;
+  PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
index 06a4edf..02fc648 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamSortRel.java
@@ -26,9 +26,9 @@ import java.util.Comparator;
 import java.util.List;
 
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.schema.UnsupportedDataTypeException;
 import org.apache.beam.sdk.coders.ListCoder;
@@ -123,10 +123,10 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
     }
   }
 
-  @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
     RelNode input = getInput();
-    PCollection<BeamSQLRow> upstream = BeamSQLRelUtils.getBeamRelInput(input)
+    PCollection<BeamSqlRow> upstream = BeamSqlRelUtils.getBeamRelInput(input)
         .buildBeamPipeline(inputPCollections);
     Type windowType = upstream.getWindowingStrategy().getWindowFn()
         .getWindowTypeDescriptor().getType();
@@ -135,24 +135,24 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
           "`ORDER BY` is only supported for GlobalWindow, actual window: " + 
windowType);
     }
 
-    BeamSQLRowComparator comparator = new BeamSQLRowComparator(fieldIndices, 
orientation,
+    BeamSqlRowComparator comparator = new BeamSqlRowComparator(fieldIndices, 
orientation,
         nullsFirst);
     // first find the top (offset + count)
-    PCollection<List<BeamSQLRow>> rawStream =
+    PCollection<List<BeamSqlRow>> rawStream =
         upstream.apply("extractTopOffsetAndFetch",
             Top.of(startIndex + count, comparator).withoutDefaults())
-        .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder()));
+        .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
 
     // strip the `leading offset`
     if (startIndex > 0) {
       rawStream = rawStream.apply("stripLeadingOffset", ParDo.of(
-          new SubListFn<BeamSQLRow>(startIndex, startIndex + count)))
-          .setCoder(ListCoder.<BeamSQLRow>of(upstream.getCoder()));
+          new SubListFn<BeamSqlRow>(startIndex, startIndex + count)))
+          .setCoder(ListCoder.<BeamSqlRow>of(upstream.getCoder()));
     }
 
-    PCollection<BeamSQLRow> orderedStream = rawStream.apply(
-        "flatten", Flatten.<BeamSQLRow>iterables());
-    orderedStream.setCoder(new 
BeamSqlRowCoder(BeamSQLRecordType.from(getRowType())));
+    PCollection<BeamSqlRow> orderedStream = rawStream.apply(
+        "flatten", Flatten.<BeamSqlRow>iterables());
+    orderedStream.setCoder(new 
BeamSqlRowCoder(BeamSqlRecordType.from(getRowType())));
 
     return orderedStream;
   }
@@ -177,12 +177,12 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
     return new BeamSortRel(getCluster(), traitSet, newInput, newCollation, 
offset, fetch);
   }
 
-  private static class BeamSQLRowComparator implements Comparator<BeamSQLRow>, 
Serializable {
+  private static class BeamSqlRowComparator implements Comparator<BeamSqlRow>, 
Serializable {
     private List<Integer> fieldsIndices;
     private List<Boolean> orientation;
     private List<Boolean> nullsFirst;
 
-    public BeamSQLRowComparator(List<Integer> fieldsIndices,
+    public BeamSqlRowComparator(List<Integer> fieldsIndices,
         List<Boolean> orientation,
         List<Boolean> nullsFirst) {
       this.fieldsIndices = fieldsIndices;
@@ -190,7 +190,7 @@ public class BeamSortRel extends Sort implements 
BeamRelNode {
       this.nullsFirst = nullsFirst;
     }
 
-    @Override public int compare(BeamSQLRow row1, BeamSQLRow row2) {
+    @Override public int compare(BeamSqlRow row1, BeamSqlRow row2) {
       for (int i = 0; i < fieldsIndices.size(); i++) {
         int fieldIndex = fieldsIndices.get(i);
         int fieldRet = 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
index ea59906..9a1887f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamValuesRel.java
@@ -23,9 +23,9 @@ import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.beam.dsls.sql.planner.BeamSQLRelUtils;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.planner.BeamSqlRelUtils;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.schema.BeamTableUtils;
 import org.apache.beam.sdk.transforms.Create;
@@ -57,17 +57,17 @@ public class BeamValuesRel extends Values implements 
BeamRelNode {
 
   }
 
-  @Override public PCollection<BeamSQLRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
+  @Override public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple 
inputPCollections)
       throws Exception {
-    List<BeamSQLRow> rows = new ArrayList<>(tuples.size());
-    String stageName = BeamSQLRelUtils.getStageName(this);
+    List<BeamSqlRow> rows = new ArrayList<>(tuples.size());
+    String stageName = BeamSqlRelUtils.getStageName(this);
     if (tuples.isEmpty()) {
       throw new IllegalStateException("Values with empty tuples!");
     }
 
-    BeamSQLRecordType beamSQLRecordType = 
BeamSQLRecordType.from(this.getRowType());
+    BeamSqlRecordType beamSQLRecordType = 
BeamSqlRecordType.from(this.getRowType());
     for (ImmutableList<RexLiteral> tuple : tuples) {
-      BeamSQLRow row = new BeamSQLRow(beamSQLRecordType);
+      BeamSqlRow row = new BeamSqlRow(beamSQLRecordType);
       for (int i = 0; i < tuple.size(); i++) {
         BeamTableUtils.addFieldWithAutoTypeCasting(row, i, 
tuple.get(i).getValue());
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
index 52d2bbd..333bb10 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BaseBeamTable.java
@@ -39,11 +39,11 @@ import org.apache.calcite.schema.Statistics;
 public abstract class BaseBeamTable implements ScannableTable, Serializable {
   private RelDataType relDataType;
 
-  protected BeamSQLRecordType beamSqlRecordType;
+  protected BeamSqlRecordType beamSqlRecordType;
 
   public BaseBeamTable(RelProtoDataType protoRowType) {
     this.relDataType = protoRowType.apply(BeamQueryPlanner.TYPE_FACTORY);
-    this.beamSqlRecordType = BeamSQLRecordType.from(relDataType);
+    this.beamSqlRecordType = BeamSqlRecordType.from(relDataType);
   }
 
   /**
@@ -53,16 +53,16 @@ public abstract class BaseBeamTable implements 
ScannableTable, Serializable {
   public abstract BeamIOType getSourceType();
 
   /**
-   * create a {@code PCollection<BeamSQLRow>} from source.
+   * create a {@code PCollection<BeamSqlRow>} from source.
    *
    */
-  public abstract PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline);
+  public abstract PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline);
 
   /**
    * create a {@code IO.write()} instance to write to target.
    *
    */
-  public abstract PTransform<? super PCollection<BeamSQLRow>, PDone> 
buildIOWriter();
+  public abstract PTransform<? super PCollection<BeamSqlRow>, PDone> 
buildIOWriter();
 
   @Override
   public Enumerable<Object[]> scan(DataContext root) {

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
index 1c3ab5b..ff77497 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamPCollectionTable.java
@@ -26,18 +26,18 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.calcite.rel.type.RelProtoDataType;
 
 /**
- * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSQLRow>} as 
a virtual table,
+ * {@code BeamPCollectionTable} converts a {@code PCollection<BeamSqlRow>} as 
a virtual table,
  * then a downstream query can query directly.
  */
 public class BeamPCollectionTable extends BaseBeamTable {
   private BeamIOType ioType;
-  private PCollection<BeamSQLRow> upstream;
+  private PCollection<BeamSqlRow> upstream;
 
   protected BeamPCollectionTable(RelProtoDataType protoRowType) {
     super(protoRowType);
   }
 
-  public BeamPCollectionTable(PCollection<BeamSQLRow> upstream, 
RelProtoDataType protoRowType){
+  public BeamPCollectionTable(PCollection<BeamSqlRow> upstream, 
RelProtoDataType protoRowType){
     this(protoRowType);
     ioType = upstream.isBounded().equals(IsBounded.BOUNDED)
         ? BeamIOType.BOUNDED : BeamIOType.UNBOUNDED;
@@ -50,12 +50,12 @@ public class BeamPCollectionTable extends BaseBeamTable {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     return upstream;
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
     throw new BeamInvalidOperatorException("cannot use [BeamPCollectionTable] 
as target");
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
deleted file mode 100644
index e8fa82f..0000000
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRecordType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.sql.type.SqlTypeName;
-
-/**
- * Field type information in {@link BeamSQLRow}.
- *
- */
-//@DefaultCoder(BeamSQLRecordTypeCoder.class)
-public class BeamSQLRecordType implements Serializable {
-  private List<String> fieldsName = new ArrayList<>();
-  private List<SqlTypeName> fieldsType = new ArrayList<>();
-
-  /**
-   * Generate from {@link RelDataType} which is used to create table.
-   */
-  public static BeamSQLRecordType from(RelDataType tableInfo) {
-    BeamSQLRecordType record = new BeamSQLRecordType();
-    for (RelDataTypeField f : tableInfo.getFieldList()) {
-      record.fieldsName.add(f.getName());
-      record.fieldsType.add(f.getType().getSqlTypeName());
-    }
-    return record;
-  }
-
-  public void addField(String fieldName, SqlTypeName fieldType) {
-    fieldsName.add(fieldName);
-    fieldsType.add(fieldType);
-  }
-
-  /**
-   * Create an instance of {@link RelDataType} so it can be used to create a 
table.
-   */
-  public RelProtoDataType toRelDataType() {
-    return new RelProtoDataType() {
-      @Override
-      public RelDataType apply(RelDataTypeFactory a) {
-        FieldInfoBuilder builder = a.builder();
-        for (int idx = 0; idx < fieldsName.size(); ++idx) {
-          builder.add(fieldsName.get(idx), fieldsType.get(idx));
-        }
-        return builder.build();
-      }
-    };
-  }
-
-  public int size() {
-    return fieldsName.size();
-  }
-
-  public List<String> getFieldsName() {
-    return fieldsName;
-  }
-
-  public void setFieldsName(List<String> fieldsName) {
-    this.fieldsName = fieldsName;
-  }
-
-  public List<SqlTypeName> getFieldsType() {
-    return fieldsType;
-  }
-
-  public void setFieldsType(List<SqlTypeName> fieldsType) {
-    this.fieldsType = fieldsType;
-  }
-
-  @Override
-  public String toString() {
-    return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + 
fieldsType + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
deleted file mode 100644
index ca045c8..0000000
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSQLRow.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.dsls.sql.schema;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.dsls.sql.exception.InvalidFieldException;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.Instant;
-
-/**
- * Repersent a generic ROW record in Beam SQL.
- *
- */
-public class BeamSQLRow implements Serializable {
-
-  private List<Integer> nullFields = new ArrayList<>();
-  private List<Object> dataValues;
-  private BeamSQLRecordType dataType;
-
-  private Instant windowStart = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
-  private Instant windowEnd = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
-  public BeamSQLRow(BeamSQLRecordType dataType) {
-    this.dataType = dataType;
-    this.dataValues = new ArrayList<>();
-    for (int idx = 0; idx < dataType.size(); ++idx) {
-      dataValues.add(null);
-      nullFields.add(idx);
-    }
-  }
-
-  public BeamSQLRow(BeamSQLRecordType dataType, List<Object> dataValues) {
-    this(dataType);
-    for (int idx = 0; idx < dataValues.size(); ++idx) {
-      addField(idx, dataValues.get(idx));
-    }
-  }
-
-  public void updateWindowRange(BeamSQLRow upstreamRecord, BoundedWindow 
window){
-    windowStart = upstreamRecord.windowStart;
-    windowEnd = upstreamRecord.windowEnd;
-
-    if (window instanceof IntervalWindow) {
-      IntervalWindow iWindow = (IntervalWindow) window;
-      windowStart = iWindow.start();
-      windowEnd = iWindow.end();
-    }
-  }
-
-  public void addField(String fieldName, Object fieldValue) {
-    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
-  }
-
-  public void addField(int index, Object fieldValue) {
-    if (fieldValue == null) {
-      return;
-    } else {
-      if (nullFields.contains(index)) {
-        nullFields.remove(nullFields.indexOf(index));
-      }
-    }
-
-    SqlTypeName fieldType = dataType.getFieldsType().get(index);
-    switch (fieldType) {
-      case INTEGER:
-        if (!(fieldValue instanceof Integer)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case SMALLINT:
-        if (!(fieldValue instanceof Short)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case TINYINT:
-        if (!(fieldValue instanceof Byte)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case DOUBLE:
-        if (!(fieldValue instanceof Double)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case BIGINT:
-        if (!(fieldValue instanceof Long)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case FLOAT:
-        if (!(fieldValue instanceof Float)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case DECIMAL:
-        if (!(fieldValue instanceof BigDecimal)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case VARCHAR:
-      case CHAR:
-        if (!(fieldValue instanceof String)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case TIME:
-        if (!(fieldValue instanceof GregorianCalendar)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      case TIMESTAMP:
-        if (!(fieldValue instanceof Date)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        }
-        break;
-      default:
-        throw new UnsupportedDataTypeException(fieldType);
-    }
-    dataValues.set(index, fieldValue);
-  }
-
-  public byte getByte(int idx) {
-    return (Byte) getFieldValue(idx);
-  }
-
-  public short getShort(int idx) {
-    return (Short) getFieldValue(idx);
-  }
-
-  public int getInteger(int idx) {
-    return (Integer) getFieldValue(idx);
-  }
-
-  public float getFloat(int idx) {
-    return (Float) getFieldValue(idx);
-  }
-
-  public double getDouble(int idx) {
-    return (Double) getFieldValue(idx);
-  }
-
-  public long getLong(int idx) {
-    return (Long) getFieldValue(idx);
-  }
-
-  public String getString(int idx) {
-    return (String) getFieldValue(idx);
-  }
-
-  public Date getDate(int idx) {
-    return (Date) getFieldValue(idx);
-  }
-
-  public GregorianCalendar getGregorianCalendar(int idx) {
-    return (GregorianCalendar) getFieldValue(idx);
-  }
-
-  public BigDecimal getBigDecimal(int idx) {
-    return (BigDecimal) getFieldValue(idx);
-  }
-
-  public Object getFieldValue(String fieldName) {
-    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
-  }
-
-  public Object getFieldValue(int fieldIdx) {
-    if (nullFields.contains(fieldIdx)) {
-      return null;
-    }
-
-    Object fieldValue = dataValues.get(fieldIdx);
-    SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
-
-    switch (fieldType) {
-      case INTEGER:
-        if (!(fieldValue instanceof Integer)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case SMALLINT:
-        if (!(fieldValue instanceof Short)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case TINYINT:
-        if (!(fieldValue instanceof Byte)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case DOUBLE:
-        if (!(fieldValue instanceof Double)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case DECIMAL:
-        if (!(fieldValue instanceof BigDecimal)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case BIGINT:
-        if (!(fieldValue instanceof Long)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case FLOAT:
-        if (!(fieldValue instanceof Float)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case VARCHAR:
-      case CHAR:
-        if (!(fieldValue instanceof String)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case TIME:
-        if (!(fieldValue instanceof GregorianCalendar)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      case TIMESTAMP:
-        if (!(fieldValue instanceof Date)) {
-          throw new InvalidFieldException(
-              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
-        } else {
-          return fieldValue;
-        }
-      default:
-        throw new UnsupportedDataTypeException(fieldType);
-    }
-  }
-
-  public int size() {
-    return dataValues.size();
-  }
-
-  public List<Object> getDataValues() {
-    return dataValues;
-  }
-
-  public void setDataValues(List<Object> dataValues) {
-    this.dataValues = dataValues;
-  }
-
-  public BeamSQLRecordType getDataType() {
-    return dataType;
-  }
-
-  public void setDataType(BeamSQLRecordType dataType) {
-    this.dataType = dataType;
-  }
-
-  public void setNullFields(List<Integer> nullFields) {
-    this.nullFields = nullFields;
-  }
-
-  public List<Integer> getNullFields() {
-    return nullFields;
-  }
-
-  /**
-   * is the specified field NULL?
-   */
-  public boolean isNull(int idx) {
-    return nullFields.contains(idx);
-  }
-
-  public Instant getWindowStart() {
-    return windowStart;
-  }
-
-  public Instant getWindowEnd() {
-    return windowEnd;
-  }
-
-  public void setWindowStart(Instant windowStart) {
-    this.windowStart = windowStart;
-  }
-
-  public void setWindowEnd(Instant windowEnd) {
-    this.windowEnd = windowEnd;
-  }
-
-  @Override
-  public String toString() {
-    return "BeamSQLRow [nullFields=" + nullFields + ", dataValues=" + 
dataValues + ", dataType="
-        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + 
windowEnd + "]";
-  }
-
-  /**
-   * Return data fields as key=value.
-   */
-  public String valueInString() {
-    StringBuffer sb = new StringBuffer();
-    for (int idx = 0; idx < size(); ++idx) {
-      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), 
getFieldValue(idx)));
-    }
-    return sb.substring(1);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
-    }
-    BeamSQLRow other = (BeamSQLRow) obj;
-    return toString().equals(other.toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
new file mode 100644
index 0000000..7da08cc
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRecordType.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFactory.FieldInfoBuilder;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Field type information in {@link BeamSqlRow}.
+ *
+ */
+public class BeamSqlRecordType implements Serializable {
+  private List<String> fieldsName = new ArrayList<>();
+  private List<SqlTypeName> fieldsType = new ArrayList<>();
+
+  /**
+   * Generate from {@link RelDataType} which is used to create table.
+   */
+  public static BeamSqlRecordType from(RelDataType tableInfo) {
+    BeamSqlRecordType record = new BeamSqlRecordType();
+    for (RelDataTypeField f : tableInfo.getFieldList()) {
+      record.fieldsName.add(f.getName());
+      record.fieldsType.add(f.getType().getSqlTypeName());
+    }
+    return record;
+  }
+
+  public void addField(String fieldName, SqlTypeName fieldType) {
+    fieldsName.add(fieldName);
+    fieldsType.add(fieldType);
+  }
+
+  /**
+   * Create an instance of {@link RelDataType} so it can be used to create a 
table.
+   */
+  public RelProtoDataType toRelDataType() {
+    return new RelProtoDataType() {
+      @Override
+      public RelDataType apply(RelDataTypeFactory a) {
+        FieldInfoBuilder builder = a.builder();
+        for (int idx = 0; idx < fieldsName.size(); ++idx) {
+          builder.add(fieldsName.get(idx), fieldsType.get(idx));
+        }
+        return builder.build();
+      }
+    };
+  }
+
+  public int size() {
+    return fieldsName.size();
+  }
+
+  public List<String> getFieldsName() {
+    return fieldsName;
+  }
+
+  public void setFieldsName(List<String> fieldsName) {
+    this.fieldsName = fieldsName;
+  }
+
+  public List<SqlTypeName> getFieldsType() {
+    return fieldsType;
+  }
+
+  public void setFieldsType(List<SqlTypeName> fieldsType) {
+    this.fieldsType = fieldsType;
+  }
+
+  @Override
+  public String toString() {
+    return "RecordType [fieldsName=" + fieldsName + ", fieldsType=" + 
fieldsType + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
new file mode 100644
index 0000000..0f82733
--- /dev/null
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.dsls.sql.schema;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.dsls.sql.exception.InvalidFieldException;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.joda.time.Instant;
+
+/**
+ * Repersent a generic ROW record in Beam SQL.
+ *
+ */
+public class BeamSqlRow implements Serializable {
+
+  private List<Integer> nullFields = new ArrayList<>();
+  private List<Object> dataValues;
+  private BeamSqlRecordType dataType;
+
+  private Instant windowStart = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
+  private Instant windowEnd = new 
Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
+
+  public BeamSqlRow(BeamSqlRecordType dataType) {
+    this.dataType = dataType;
+    this.dataValues = new ArrayList<>();
+    for (int idx = 0; idx < dataType.size(); ++idx) {
+      dataValues.add(null);
+      nullFields.add(idx);
+    }
+  }
+
+  public BeamSqlRow(BeamSqlRecordType dataType, List<Object> dataValues) {
+    this(dataType);
+    for (int idx = 0; idx < dataValues.size(); ++idx) {
+      addField(idx, dataValues.get(idx));
+    }
+  }
+
+  public void updateWindowRange(BeamSqlRow upstreamRecord, BoundedWindow 
window){
+    windowStart = upstreamRecord.windowStart;
+    windowEnd = upstreamRecord.windowEnd;
+
+    if (window instanceof IntervalWindow) {
+      IntervalWindow iWindow = (IntervalWindow) window;
+      windowStart = iWindow.start();
+      windowEnd = iWindow.end();
+    }
+  }
+
+  public void addField(String fieldName, Object fieldValue) {
+    addField(dataType.getFieldsName().indexOf(fieldName), fieldValue);
+  }
+
+  public void addField(int index, Object fieldValue) {
+    if (fieldValue == null) {
+      return;
+    } else {
+      if (nullFields.contains(index)) {
+        nullFields.remove(nullFields.indexOf(index));
+      }
+    }
+
+    SqlTypeName fieldType = dataType.getFieldsType().get(index);
+    switch (fieldType) {
+      case INTEGER:
+        if (!(fieldValue instanceof Integer)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case SMALLINT:
+        if (!(fieldValue instanceof Short)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case TINYINT:
+        if (!(fieldValue instanceof Byte)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case DOUBLE:
+        if (!(fieldValue instanceof Double)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case BIGINT:
+        if (!(fieldValue instanceof Long)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case FLOAT:
+        if (!(fieldValue instanceof Float)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case DECIMAL:
+        if (!(fieldValue instanceof BigDecimal)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case VARCHAR:
+      case CHAR:
+        if (!(fieldValue instanceof String)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case TIME:
+        if (!(fieldValue instanceof GregorianCalendar)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      case TIMESTAMP:
+        if (!(fieldValue instanceof Date)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        }
+        break;
+      default:
+        throw new UnsupportedDataTypeException(fieldType);
+    }
+    dataValues.set(index, fieldValue);
+  }
+
+  public byte getByte(int idx) {
+    return (Byte) getFieldValue(idx);
+  }
+
+  public short getShort(int idx) {
+    return (Short) getFieldValue(idx);
+  }
+
+  public int getInteger(int idx) {
+    return (Integer) getFieldValue(idx);
+  }
+
+  public float getFloat(int idx) {
+    return (Float) getFieldValue(idx);
+  }
+
+  public double getDouble(int idx) {
+    return (Double) getFieldValue(idx);
+  }
+
+  public long getLong(int idx) {
+    return (Long) getFieldValue(idx);
+  }
+
+  public String getString(int idx) {
+    return (String) getFieldValue(idx);
+  }
+
+  public Date getDate(int idx) {
+    return (Date) getFieldValue(idx);
+  }
+
+  public GregorianCalendar getGregorianCalendar(int idx) {
+    return (GregorianCalendar) getFieldValue(idx);
+  }
+
+  public BigDecimal getBigDecimal(int idx) {
+    return (BigDecimal) getFieldValue(idx);
+  }
+
+  public Object getFieldValue(String fieldName) {
+    return getFieldValue(dataType.getFieldsName().indexOf(fieldName));
+  }
+
+  public Object getFieldValue(int fieldIdx) {
+    if (nullFields.contains(fieldIdx)) {
+      return null;
+    }
+
+    Object fieldValue = dataValues.get(fieldIdx);
+    SqlTypeName fieldType = dataType.getFieldsType().get(fieldIdx);
+
+    switch (fieldType) {
+      case INTEGER:
+        if (!(fieldValue instanceof Integer)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case SMALLINT:
+        if (!(fieldValue instanceof Short)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case TINYINT:
+        if (!(fieldValue instanceof Byte)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case DOUBLE:
+        if (!(fieldValue instanceof Double)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case DECIMAL:
+        if (!(fieldValue instanceof BigDecimal)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case BIGINT:
+        if (!(fieldValue instanceof Long)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case FLOAT:
+        if (!(fieldValue instanceof Float)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case VARCHAR:
+      case CHAR:
+        if (!(fieldValue instanceof String)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case TIME:
+        if (!(fieldValue instanceof GregorianCalendar)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      case TIMESTAMP:
+        if (!(fieldValue instanceof Date)) {
+          throw new InvalidFieldException(
+              String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));
+        } else {
+          return fieldValue;
+        }
+      default:
+        throw new UnsupportedDataTypeException(fieldType);
+    }
+  }
+
+  public int size() {
+    return dataValues.size();
+  }
+
+  public List<Object> getDataValues() {
+    return dataValues;
+  }
+
+  public void setDataValues(List<Object> dataValues) {
+    this.dataValues = dataValues;
+  }
+
+  public BeamSqlRecordType getDataType() {
+    return dataType;
+  }
+
+  public void setDataType(BeamSqlRecordType dataType) {
+    this.dataType = dataType;
+  }
+
+  public void setNullFields(List<Integer> nullFields) {
+    this.nullFields = nullFields;
+  }
+
+  public List<Integer> getNullFields() {
+    return nullFields;
+  }
+
+  /**
+   * is the specified field NULL?
+   */
+  public boolean isNull(int idx) {
+    return nullFields.contains(idx);
+  }
+
+  public Instant getWindowStart() {
+    return windowStart;
+  }
+
+  public Instant getWindowEnd() {
+    return windowEnd;
+  }
+
+  public void setWindowStart(Instant windowStart) {
+    this.windowStart = windowStart;
+  }
+
+  public void setWindowEnd(Instant windowEnd) {
+    this.windowEnd = windowEnd;
+  }
+
+  @Override
+  public String toString() {
+    return "BeamSqlRow [nullFields=" + nullFields + ", dataValues=" + 
dataValues + ", dataType="
+        + dataType + ", windowStart=" + windowStart + ", windowEnd=" + 
windowEnd + "]";
+  }
+
+  /**
+   * Return data fields as key=value.
+   */
+  public String valueInString() {
+    StringBuffer sb = new StringBuffer();
+    for (int idx = 0; idx < size(); ++idx) {
+      sb.append(String.format(",%s=%s", dataType.getFieldsName().get(idx), 
getFieldValue(idx)));
+    }
+    return sb.substring(1);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    BeamSqlRow other = (BeamSqlRow) obj;
+    return toString().equals(other.toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
index 14a0f31..6552dd3 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRowCoder.java
@@ -35,10 +35,10 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 
 /**
- *  A {@link Coder} encodes {@link BeamSQLRow}.
+ *  A {@link Coder} encodes {@link BeamSqlRow}.
  */
-public class BeamSqlRowCoder extends CustomCoder<BeamSQLRow> {
-  private BeamSQLRecordType tableSchema;
+public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> {
+  private BeamSqlRecordType tableSchema;
 
   private static final ListCoder<Integer> listCoder = 
ListCoder.of(BigEndianIntegerCoder.of());
 
@@ -49,12 +49,12 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSQLRow> {
   private static final InstantCoder instantCoder = InstantCoder.of();
   private static final BigDecimalCoder bigDecimalCoder = BigDecimalCoder.of();
 
-  public BeamSqlRowCoder(BeamSQLRecordType tableSchema) {
+  public BeamSqlRowCoder(BeamSqlRecordType tableSchema) {
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public void encode(BeamSQLRow value, OutputStream outStream) throws 
CoderException, IOException {
+  public void encode(BeamSqlRow value, OutputStream outStream) throws 
CoderException, IOException {
     listCoder.encode(value.getNullFields(), outStream);
 
     for (int idx = 0; idx < value.size(); ++idx) {
@@ -105,10 +105,10 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSQLRow> {
   }
 
   @Override
-  public BeamSQLRow decode(InputStream inStream) throws CoderException, 
IOException {
+  public BeamSqlRow decode(InputStream inStream) throws CoderException, 
IOException {
     List<Integer> nullFields = listCoder.decode(inStream);
 
-    BeamSQLRow record = new BeamSQLRow(tableSchema);
+    BeamSqlRow record = new BeamSqlRow(tableSchema);
     record.setNullFields(nullFields);
 
     for (int idx = 0; idx < tableSchema.size(); ++idx) {
@@ -162,7 +162,7 @@ public class BeamSqlRowCoder extends 
CustomCoder<BeamSQLRow> {
     return record;
   }
 
-  public BeamSQLRecordType getTableSchema() {
+  public BeamSqlRecordType getTableSchema() {
     return tableSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
index c7397e1..134cf8f 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamTableUtils.java
@@ -35,11 +35,11 @@ import org.apache.commons.csv.CSVRecord;
  * Utility methods for working with {@code BeamTable}.
  */
 public final class BeamTableUtils {
-  public static BeamSQLRow csvLine2BeamSQLRow(
+  public static BeamSqlRow csvLine2BeamSqlRow(
       CSVFormat csvFormat,
       String line,
-      BeamSQLRecordType beamSqlRecordType) {
-    BeamSQLRow row = new BeamSQLRow(beamSqlRecordType);
+      BeamSqlRecordType beamSqlRecordType) {
+    BeamSqlRow row = new BeamSqlRow(beamSqlRecordType);
     try (StringReader reader = new StringReader(line)) {
       CSVParser parser = csvFormat.parse(reader);
       CSVRecord rawRecord = parser.getRecords().get(0);
@@ -61,7 +61,7 @@ public final class BeamTableUtils {
     return row;
   }
 
-  public static String beamSQLRow2CsvLine(BeamSQLRow row, CSVFormat csvFormat) 
{
+  public static String beamSqlRow2CsvLine(BeamSqlRow row, CSVFormat csvFormat) 
{
     StringWriter writer = new StringWriter();
     try (CSVPrinter printer = csvFormat.print(writer)) {
       for (int i = 0; i < row.size(); i++) {
@@ -74,7 +74,7 @@ public final class BeamTableUtils {
     return writer.toString();
   }
 
-  public static void addFieldWithAutoTypeCasting(BeamSQLRow row, int idx, 
Object rawObj) {
+  public static void addFieldWithAutoTypeCasting(BeamSqlRow row, int idx, 
Object rawObj) {
     if (rawObj == null) {
       row.addField(idx, rawObj);
       return;

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
index 127870c..f8c2553 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaCSVTable.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.dsls.sql.schema.kafka;
 
-import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine;
-import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow;
+import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
+import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
 
 import java.util.List;
 
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -50,62 +50,62 @@ public class BeamKafkaCSVTable extends BeamKafkaTable {
   }
 
   @Override
-  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSQLRow>>
+  public PTransform<PCollection<KV<byte[], byte[]>>, PCollection<BeamSqlRow>>
       getPTransformForInput() {
     return new CsvRecorderDecoder(beamSqlRecordType, csvFormat);
   }
 
   @Override
-  public PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], byte[]>>>
+  public PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], byte[]>>>
       getPTransformForOutput() {
     return new CsvRecorderEncoder(beamSqlRecordType, csvFormat);
   }
 
   /**
-   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSQLRow}.
+   * A PTransform to convert {@code KV<byte[], byte[]>} to {@link BeamSqlRow}.
    *
    */
   public static class CsvRecorderDecoder
-      extends PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSQLRow>> {
-    private BeamSQLRecordType recordType;
+      extends PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSqlRow>> {
+    private BeamSqlRecordType recordType;
     private CSVFormat format;
-    public CsvRecorderDecoder(BeamSQLRecordType recordType, CSVFormat format) {
+    public CsvRecorderDecoder(BeamSqlRecordType recordType, CSVFormat format) {
       this.recordType = recordType;
       this.format = format;
     }
 
     @Override
-    public PCollection<BeamSQLRow> expand(PCollection<KV<byte[], byte[]>> 
input) {
-      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, 
BeamSQLRow>() {
+    public PCollection<BeamSqlRow> expand(PCollection<KV<byte[], byte[]>> 
input) {
+      return input.apply("decodeRecord", ParDo.of(new DoFn<KV<byte[], byte[]>, 
BeamSqlRow>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
           String rowInString = new String(c.element().getValue());
-          c.output(csvLine2BeamSQLRow(format, rowInString, recordType));
+          c.output(csvLine2BeamSqlRow(format, rowInString, recordType));
         }
       }));
     }
   }
 
   /**
-   * A PTransform to convert {@link BeamSQLRow} to {@code KV<byte[], byte[]>}.
+   * A PTransform to convert {@link BeamSqlRow} to {@code KV<byte[], byte[]>}.
    *
    */
   public static class CsvRecorderEncoder
-      extends PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], 
byte[]>>> {
-    private BeamSQLRecordType recordType;
+      extends PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], 
byte[]>>> {
+    private BeamSqlRecordType recordType;
     private CSVFormat format;
-    public CsvRecorderEncoder(BeamSQLRecordType recordType, CSVFormat format) {
+    public CsvRecorderEncoder(BeamSqlRecordType recordType, CSVFormat format) {
       this.recordType = recordType;
       this.format = format;
     }
 
     @Override
-    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSQLRow> 
input) {
-      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, 
KV<byte[], byte[]>>() {
+    public PCollection<KV<byte[], byte[]>> expand(PCollection<BeamSqlRow> 
input) {
+      return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, 
KV<byte[], byte[]>>() {
         @ProcessElement
         public void processElement(ProcessContext c) {
-          BeamSQLRow in = c.element();
-          c.output(KV.of(new byte[] {}, beamSQLRow2CsvLine(in, 
format).getBytes()));
+          BeamSqlRow in = c.element();
+          c.output(KV.of(new byte[] {}, beamSqlRow2CsvLine(in, 
format).getBytes()));
         }
       }));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
index aa7cf3a..c43fa2c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/kafka/BeamKafkaTable.java
@@ -23,7 +23,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.dsls.sql.schema.BaseBeamTable;
 import org.apache.beam.dsls.sql.schema.BeamIOType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -38,7 +38,7 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 
 /**
  * {@code BeamKafkaTable} represent a Kafka topic, as source or target. Need to
- * extend to convert between {@code BeamSQLRow} and {@code KV<byte[], byte[]>}.
+ * extend to convert between {@code BeamSqlRow} and {@code KV<byte[], byte[]>}.
  *
  */
 public abstract class BeamKafkaTable extends BaseBeamTable implements 
Serializable {
@@ -68,14 +68,14 @@ public abstract class BeamKafkaTable extends BaseBeamTable 
implements Serializab
     return BeamIOType.UNBOUNDED;
   }
 
-  public abstract PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSQLRow>>
+  public abstract PTransform<PCollection<KV<byte[], byte[]>>, 
PCollection<BeamSqlRow>>
       getPTransformForInput();
 
-  public abstract PTransform<PCollection<BeamSQLRow>, PCollection<KV<byte[], 
byte[]>>>
+  public abstract PTransform<PCollection<BeamSqlRow>, PCollection<KV<byte[], 
byte[]>>>
       getPTransformForOutput();
 
   @Override
-  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("read",
             KafkaIO.<byte[], byte[]>read()
                 .withBootstrapServers(bootstrapServers)
@@ -88,13 +88,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable 
implements Serializab
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
     checkArgument(topics != null && topics.size() == 1,
         "Only one topic can be acceptable as output.");
 
-    return new PTransform<PCollection<BeamSQLRow>, PDone>() {
+    return new PTransform<PCollection<BeamSqlRow>, PDone>() {
       @Override
-      public PDone expand(PCollection<BeamSQLRow> input) {
+      public PDone expand(PCollection<BeamSqlRow> input) {
         return input.apply("out_reformat", 
getPTransformForOutput()).apply("persistent",
             KafkaIO.<byte[], byte[]>write()
                 .withBootstrapServers(bootstrapServers)

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
index 41742c7..e575eee 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTable.java
@@ -18,7 +18,7 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -57,14 +57,14 @@ public class BeamTextCSVTable extends BeamTextTable {
   }
 
   @Override
-  public PCollection<BeamSQLRow> buildIOReader(Pipeline pipeline) {
+  public PCollection<BeamSqlRow> buildIOReader(Pipeline pipeline) {
     return PBegin.in(pipeline).apply("decodeRecord", 
TextIO.read().from(filePattern))
         .apply("parseCSVLine",
             new BeamTextCSVTableIOReader(beamSqlRecordType, filePattern, 
csvFormat));
   }
 
   @Override
-  public PTransform<? super PCollection<BeamSQLRow>, PDone> buildIOWriter() {
+  public PTransform<? super PCollection<BeamSqlRow>, PDone> buildIOWriter() {
     return new BeamTextCSVTableIOWriter(beamSqlRecordType, filePattern, 
csvFormat);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
index 59d77c0..ef0a465 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOReader.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSQLRow;
+import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.csvLine2BeamSqlRow;
 
 import java.io.Serializable;
 
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -34,13 +34,13 @@ import org.apache.commons.csv.CSVFormat;
  * IOReader for {@code BeamTextCSVTable}.
  */
 public class BeamTextCSVTableIOReader
-    extends PTransform<PCollection<String>, PCollection<BeamSQLRow>>
+    extends PTransform<PCollection<String>, PCollection<BeamSqlRow>>
     implements Serializable {
   private String filePattern;
-  protected BeamSQLRecordType beamSqlRecordType;
+  protected BeamSqlRecordType beamSqlRecordType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOReader(BeamSQLRecordType beamSqlRecordType, String 
filePattern,
+  public BeamTextCSVTableIOReader(BeamSqlRecordType beamSqlRecordType, String 
filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
     this.beamSqlRecordType = beamSqlRecordType;
@@ -48,12 +48,12 @@ public class BeamTextCSVTableIOReader
   }
 
   @Override
-  public PCollection<BeamSQLRow> expand(PCollection<String> input) {
-    return input.apply(ParDo.of(new DoFn<String, BeamSQLRow>() {
+  public PCollection<BeamSqlRow> expand(PCollection<String> input) {
+    return input.apply(ParDo.of(new DoFn<String, BeamSqlRow>() {
           @ProcessElement
           public void processElement(ProcessContext ctx) {
             String str = ctx.element();
-            ctx.output(csvLine2BeamSQLRow(csvFormat, str, beamSqlRecordType));
+            ctx.output(csvLine2BeamSqlRow(csvFormat, str, beamSqlRecordType));
           }
         }));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
index 9b9cbd2..35a546c 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/text/BeamTextCSVTableIOWriter.java
@@ -18,12 +18,12 @@
 
 package org.apache.beam.dsls.sql.schema.text;
 
-import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSQLRow2CsvLine;
+import static 
org.apache.beam.dsls.sql.schema.BeamTableUtils.beamSqlRow2CsvLine;
 
 import java.io.Serializable;
 
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -35,25 +35,25 @@ import org.apache.commons.csv.CSVFormat;
 /**
  * IOWriter for {@code BeamTextCSVTable}.
  */
-public class BeamTextCSVTableIOWriter extends 
PTransform<PCollection<BeamSQLRow>, PDone>
+public class BeamTextCSVTableIOWriter extends 
PTransform<PCollection<BeamSqlRow>, PDone>
     implements Serializable {
   private String filePattern;
-  protected BeamSQLRecordType beamSqlRecordType;
+  protected BeamSqlRecordType beamSqlRecordType;
   protected CSVFormat csvFormat;
 
-  public BeamTextCSVTableIOWriter(BeamSQLRecordType beamSqlRecordType, String 
filePattern,
+  public BeamTextCSVTableIOWriter(BeamSqlRecordType beamSqlRecordType, String 
filePattern,
       CSVFormat csvFormat) {
     this.filePattern = filePattern;
     this.beamSqlRecordType = beamSqlRecordType;
     this.csvFormat = csvFormat;
   }
 
-  @Override public PDone expand(PCollection<BeamSQLRow> input) {
-    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSQLRow, String>() 
{
+  @Override public PDone expand(PCollection<BeamSqlRow> input) {
+    return input.apply("encodeRecord", ParDo.of(new DoFn<BeamSqlRow, String>() 
{
 
       @ProcessElement public void processElement(ProcessContext ctx) {
-        BeamSQLRow row = ctx.element();
-        ctx.output(beamSQLRow2CsvLine(row, csvFormat));
+        BeamSqlRow row = ctx.element();
+        ctx.output(beamSqlRow2CsvLine(row, csvFormat));
       }
     })).apply(TextIO.write().to(filePattern));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/743f0b3b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
index 943c897..a282ff9 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java
@@ -26,8 +26,8 @@ import 
org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
-import org.apache.beam.dsls.sql.schema.BeamSQLRecordType;
-import org.apache.beam.dsls.sql.schema.BeamSQLRow;
+import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -46,11 +46,11 @@ public class BeamAggregationTransforms implements 
Serializable{
   /**
    * Merge KV to single record.
    */
-  public static class MergeAggregationRecord extends DoFn<KV<BeamSQLRow, 
BeamSQLRow>, BeamSQLRow> {
-    private BeamSQLRecordType outRecordType;
+  public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, 
BeamSqlRow>, BeamSqlRow> {
+    private BeamSqlRecordType outRecordType;
     private List<String> aggFieldNames;
 
-    public MergeAggregationRecord(BeamSQLRecordType outRecordType, 
List<AggregateCall> aggList) {
+    public MergeAggregationRecord(BeamSqlRecordType outRecordType, 
List<AggregateCall> aggList) {
       this.outRecordType = outRecordType;
       this.aggFieldNames = new ArrayList<>();
       for (AggregateCall ac : aggList) {
@@ -60,10 +60,10 @@ public class BeamAggregationTransforms implements 
Serializable{
 
     @ProcessElement
     public void processElement(ProcessContext c, BoundedWindow window) {
-      BeamSQLRow outRecord = new BeamSQLRow(outRecordType);
+      BeamSqlRow outRecord = new BeamSqlRow(outRecordType);
       outRecord.updateWindowRange(c.element().getKey(), window);
 
-      KV<BeamSQLRow, BeamSQLRow> kvRecord = c.element();
+      KV<BeamSqlRow, BeamSqlRow> kvRecord = c.element();
       for (String f : kvRecord.getKey().getDataType().getFieldsName()) {
         outRecord.addField(f, kvRecord.getKey().getFieldValue(f));
       }
@@ -81,7 +81,7 @@ public class BeamAggregationTransforms implements 
Serializable{
    * extract group-by fields.
    */
   public static class AggregationGroupByKeyFn
-      implements SerializableFunction<BeamSQLRow, BeamSQLRow> {
+      implements SerializableFunction<BeamSqlRow, BeamSqlRow> {
     private List<Integer> groupByKeys;
 
     public AggregationGroupByKeyFn(int windowFieldIdx, ImmutableBitSet 
groupSet) {
@@ -94,9 +94,9 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
 
     @Override
-    public BeamSQLRow apply(BeamSQLRow input) {
-      BeamSQLRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
-      BeamSQLRow keyOfRecord = new BeamSQLRow(typeOfKey);
+    public BeamSqlRow apply(BeamSqlRow input) {
+      BeamSqlRecordType typeOfKey = exTypeOfKeyRecord(input.getDataType());
+      BeamSqlRow keyOfRecord = new BeamSqlRow(typeOfKey);
       keyOfRecord.updateWindowRange(input, null);
 
       for (int idx = 0; idx < groupByKeys.size(); ++idx) {
@@ -105,8 +105,8 @@ public class BeamAggregationTransforms implements 
Serializable{
       return keyOfRecord;
     }
 
-    private BeamSQLRecordType exTypeOfKeyRecord(BeamSQLRecordType dataType) {
-      BeamSQLRecordType typeOfKey = new BeamSQLRecordType();
+    private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) {
+      BeamSqlRecordType typeOfKey = new BeamSqlRecordType();
       for (int idx : groupByKeys) {
         typeOfKey.addField(dataType.getFieldsName().get(idx), 
dataType.getFieldsType().get(idx));
       }
@@ -118,7 +118,7 @@ public class BeamAggregationTransforms implements 
Serializable{
   /**
    * Assign event timestamp.
    */
-  public static class WindowTimestampFn implements 
SerializableFunction<BeamSQLRow, Instant> {
+  public static class WindowTimestampFn implements 
SerializableFunction<BeamSqlRow, Instant> {
     private int windowFieldIdx = -1;
 
     public WindowTimestampFn(int windowFieldIdx) {
@@ -127,7 +127,7 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
 
     @Override
-    public Instant apply(BeamSQLRow input) {
+    public Instant apply(BeamSqlRow input) {
       return new Instant(input.getDate(windowFieldIdx).getTime());
     }
   }
@@ -142,8 +142,8 @@ public class BeamAggregationTransforms implements 
Serializable{
    *   3). SUM/AVG works for INT, LONG, FLOAT, DOUBLE, DECIMAL, SMALLINT, 
TINYINT;<br>
    *
    */
-  public static class AggregationCombineFn extends CombineFn<BeamSQLRow, 
BeamSQLRow, BeamSQLRow> {
-    private BeamSQLRecordType aggDataType;
+  public static class AggregationCombineFn extends CombineFn<BeamSqlRow, 
BeamSqlRow, BeamSqlRow> {
+    private BeamSqlRecordType aggDataType;
 
     private int countIndex = -1;
 
@@ -151,8 +151,8 @@ public class BeamAggregationTransforms implements 
Serializable{
     List<BeamSqlExpression> aggElementExpressions;
 
     public AggregationCombineFn(List<AggregateCall> aggregationCalls,
-        BeamSQLRecordType sourceRowRecordType) {
-      this.aggDataType = new BeamSQLRecordType();
+        BeamSqlRecordType sourceRowRecordType) {
+      this.aggDataType = new BeamSqlRecordType();
       this.aggFunctions = new ArrayList<>();
       this.aggElementExpressions = new ArrayList<>();
 
@@ -254,8 +254,8 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
 
     @Override
-    public BeamSQLRow createAccumulator() {
-      BeamSQLRow initialRecord = new BeamSQLRow(aggDataType);
+    public BeamSqlRow createAccumulator() {
+      BeamSqlRow initialRecord = new BeamSqlRow(aggDataType);
       for (int idx = 0; idx < aggElementExpressions.size(); ++idx) {
         BeamSqlExpression ex = aggElementExpressions.get(idx);
         String aggFnName = aggFunctions.get(idx);
@@ -351,8 +351,8 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
 
     @Override
-    public BeamSQLRow addInput(BeamSQLRow accumulator, BeamSQLRow input) {
-      BeamSQLRow deltaRecord = new BeamSQLRow(aggDataType);
+    public BeamSqlRow addInput(BeamSqlRow accumulator, BeamSqlRow input) {
+      BeamSqlRow deltaRecord = new BeamSqlRow(aggDataType);
       for (int idx = 0; idx < aggElementExpressions.size(); ++idx) {
         BeamSqlExpression ex = aggElementExpressions.get(idx);
         String aggFnName = aggFunctions.get(idx);
@@ -468,11 +468,11 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
 
     @Override
-    public BeamSQLRow mergeAccumulators(Iterable<BeamSQLRow> accumulators) {
-      BeamSQLRow deltaRecord = new BeamSQLRow(aggDataType);
+    public BeamSqlRow mergeAccumulators(Iterable<BeamSqlRow> accumulators) {
+      BeamSqlRow deltaRecord = new BeamSqlRow(aggDataType);
 
       while (accumulators.iterator().hasNext()) {
-        BeamSQLRow sa = accumulators.iterator().next();
+        BeamSqlRow sa = accumulators.iterator().next();
         for (int idx = 0; idx < aggElementExpressions.size(); ++idx) {
           BeamSqlExpression ex = aggElementExpressions.get(idx);
           String aggFnName = aggFunctions.get(idx);
@@ -575,8 +575,8 @@ public class BeamAggregationTransforms implements 
Serializable{
     }
 
     @Override
-    public BeamSQLRow extractOutput(BeamSQLRow accumulator) {
-      BeamSQLRow finalRecord = new BeamSQLRow(aggDataType);
+    public BeamSqlRow extractOutput(BeamSqlRow accumulator) {
+      BeamSqlRow finalRecord = new BeamSqlRow(aggDataType);
       for (int idx = 0; idx < aggElementExpressions.size(); ++idx) {
         BeamSqlExpression ex = aggElementExpressions.get(idx);
         String aggFnName = aggFunctions.get(idx);

Reply via email to