Repository: beam
Updated Branches:
  refs/heads/DSL_SQL e68badd4d -> c0171593b


[BEAM-2247] Implement date functions in SQL DSL


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2bb57996
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2bb57996
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2bb57996

Branch: refs/heads/DSL_SQL
Commit: 2bb579964525da52220efd9dea8169e71f66c0d8
Parents: e68badd
Author: James Xu <xumingmi...@gmail.com>
Authored: Wed May 17 11:48:11 2017 +0800
Committer: Jean-Baptiste Onofré <jbono...@apache.org>
Committed: Tue Jun 13 11:01:37 2017 +0200

----------------------------------------------------------------------
 dsls/sql/pom.xml                                |   6 +
 .../dsls/sql/interpreter/BeamSqlFnExecutor.java |  48 +++++++-
 .../interpreter/operator/BeamSqlPrimitive.java  |   4 +
 .../operator/BeamSqlReinterpretExpression.java  |  55 +++++++++
 .../date/BeamSqlCurrentDateExpression.java      |  45 +++++++
 .../date/BeamSqlCurrentTimeExpression.java      |  47 ++++++++
 .../date/BeamSqlDateCeilExpression.java         |  55 +++++++++
 .../date/BeamSqlDateFloorExpression.java        |  55 +++++++++
 .../operator/date/BeamSqlExtractExpression.java | 111 +++++++++++++++++
 .../date/BeamSqlLocalTimeExpression.java        |  53 ++++++++
 .../date/BeamSqlLocalTimestampExpression.java   |  49 ++++++++
 .../interpreter/operator/date/package-info.java |  22 ++++
 .../apache/beam/dsls/sql/schema/BeamSqlRow.java |   2 +
 .../sql/interpreter/BeamSqlFnExecutorTest.java  |  83 +++++++++++++
 .../BeamSqlReinterpretExpressionTest.java       |  77 ++++++++++++
 .../date/BeamSqlCurrentDateExpressionTest.java  |  35 ++++++
 .../date/BeamSqlCurrentTimeExpressionTest.java  |  35 ++++++
 .../date/BeamSqlDateCeilExpressionTest.java     |  49 ++++++++
 .../date/BeamSqlDateExpressionTestBase.java     |  52 ++++++++
 .../date/BeamSqlDateFloorExpressionTest.java    |  50 ++++++++
 .../date/BeamSqlExtractExpressionTest.java      | 120 +++++++++++++++++++
 .../date/BeamSqlLocalTimeExpressionTest.java    |  40 +++++++
 .../BeamSqlLocalTimestampExpressionTest.java    |  40 +++++++
 23 files changed, 1128 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/pom.xml
----------------------------------------------------------------------
diff --git a/dsls/sql/pom.xml b/dsls/sql/pom.xml
index 39e32c4..e70c88c 100644
--- a/dsls/sql/pom.xml
+++ b/dsls/sql/pom.xml
@@ -35,6 +35,7 @@
     <timestamp>${maven.build.timestamp}</timestamp>
     <maven.build.timestamp.format>yyyy-MM-dd 
HH:mm</maven.build.timestamp.format>
     <calcite-version>1.12.0</calcite-version>
+    <avatica-version>1.9.0</avatica-version>
   </properties>
 
   <build>
@@ -165,6 +166,11 @@
       <version>${calcite-version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.calcite.avatica</groupId>
+      <artifactId>avatica-core</artifactId>
+      <version>${avatica-version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.hamcrest</groupId>
       <artifactId>hamcrest-all</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
index 1d1dfc1..524b177 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.beam.dsls.sql.interpreter;
 
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.List;
 
 import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
@@ -35,6 +36,7 @@ import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlReinterpretExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression;
@@ -44,6 +46,13 @@ import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpr
 import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimeExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimestampExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlAbsExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.math.BeamSqlSqrtExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
@@ -103,13 +112,20 @@ public class BeamSqlFnExecutor implements 
BeamSqlExpressionExecutor {
   static BeamSqlExpression buildExpression(RexNode rexNode) {
     if (rexNode instanceof RexLiteral) {
       RexLiteral node = (RexLiteral) rexNode;
-      // NlsString is not serializable, we need to convert
-      // it to string explicitly.
-      if (SqlTypeName.CHAR_TYPES.contains(node.getTypeName())
+      SqlTypeName type = node.getTypeName();
+      Object value = node.getValue();
+
+      if (SqlTypeName.CHAR_TYPES.contains(type)
           && node.getValue() instanceof NlsString) {
-        return BeamSqlPrimitive.of(node.getTypeName(), ((NlsString) 
node.getValue()).getValue());
+        // NlsString is not serializable, we need to convert
+        // it to string explicitly.
+        return BeamSqlPrimitive.of(type, ((NlsString) value).getValue());
+      } else if (type == SqlTypeName.DATE && value instanceof Calendar) {
+        // does this actually make sense?
+        // Calcite actually treat Calendar as the java type of Date Literal
+        return BeamSqlPrimitive.of(type, ((Calendar) value).getTime());
       } else {
-        return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
+        return BeamSqlPrimitive.of(type, value);
       }
     } else if (rexNode instanceof RexInputRef) {
       RexInputRef node = (RexInputRef) rexNode;
@@ -148,6 +164,7 @@ public class BeamSqlFnExecutor implements 
BeamSqlExpressionExecutor {
         case "*":
           return new BeamSqlMultiplyExpression(subExps);
         case "/":
+        case "/INT":
           return new BeamSqlDivideExpression(subExps);
         case "MOD":
           return new BeamSqlModExpression(subExps);
@@ -178,6 +195,27 @@ public class BeamSqlFnExecutor implements 
BeamSqlExpressionExecutor {
         case "INITCAP":
           return new BeamSqlInitCapExpression(subExps);
 
+        // date functions
+        case "REINTERPRET":
+          return new BeamSqlReinterpretExpression(subExps, 
node.type.getSqlTypeName());
+        case "CEIL":
+          return new BeamSqlDateCeilExpression(subExps);
+        case "FLOOR":
+          return new BeamSqlDateFloorExpression(subExps);
+        case "EXTRACT_DATE":
+        case "EXTRACT":
+          return new BeamSqlExtractExpression(subExps);
+        case "LOCALTIME":
+          return new BeamSqlLocalTimeExpression(subExps);
+        case "LOCALTIMESTAMP":
+          return new BeamSqlLocalTimestampExpression(subExps);
+        case "CURRENT_TIME":
+        case "CURRENT_TIMESTAMP":
+          return new BeamSqlCurrentTimeExpression();
+        case "CURRENT_DATE":
+          return new BeamSqlCurrentDateExpression();
+
+
         case "CASE":
           return new BeamSqlCaseExpression(subExps);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
index d1fd886..a0b3a55 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java
@@ -130,11 +130,15 @@ public class BeamSqlPrimitive<T> extends 
BeamSqlExpression{
     case TIME:
       return value instanceof GregorianCalendar;
     case TIMESTAMP:
+    case DATE:
       return value instanceof Date;
     case INTERVAL_HOUR:
       return value instanceof BigDecimal;
     case INTERVAL_MINUTE:
       return value instanceof BigDecimal;
+    case SYMBOL:
+      // for SYMBOL, it supports anything...
+      return true;
     default:
       throw new BeamSqlUnsupportedException(outputType.name());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
new file mode 100644
index 0000000..783466c
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for REINTERPRET.
+ *
+ * <p>Currently only converting from {@link SqlTypeName#DATETIME_TYPES}
+ * to {@code BIGINT} is supported.
+ */
+public class BeamSqlReinterpretExpression extends BeamSqlExpression {
+  public BeamSqlReinterpretExpression(List<BeamSqlExpression> operands, 
SqlTypeName outputType) {
+    super(operands, outputType);
+  }
+
+  @Override public boolean accept() {
+    return getOperands().size() == 1
+        && outputType == SqlTypeName.BIGINT
+        && SqlTypeName.DATETIME_TYPES.contains(opType(0));
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    if (opType(0) == SqlTypeName.TIME) {
+      GregorianCalendar date = opValueEvaluated(0, inputRecord);
+      return BeamSqlPrimitive.of(outputType, date.getTimeInMillis());
+
+    } else {
+      Date date = opValueEvaluated(0, inputRecord);
+      return BeamSqlPrimitive.of(outputType, date.getTime());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
new file mode 100644
index 0000000..2f83140
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpression.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Collections;
+import java.util.Date;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CURRENT_DATE and LOCALTIME.
+ *
+ * <p>Returns the current date in the session time zone, in a value of 
datatype DATE.
+ */
+public class BeamSqlCurrentDateExpression extends BeamSqlExpression {
+  public BeamSqlCurrentDateExpression() {
+    super(Collections.<BeamSqlExpression>emptyList(), SqlTypeName.DATE);
+  }
+  @Override public boolean accept() {
+    return getOperands().size() == 0;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
new file mode 100644
index 0000000..2e7458b
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpression.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Collections;
+import java.util.Date;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CURRENT_TIME and CURRENT_TIMESTAMP.
+ *
+ * <p>Returns the current time in the session time zone, in a value of datatype
+ * TIMESTAMP WITH TIME ZONE.
+ */
+public class BeamSqlCurrentTimeExpression extends BeamSqlExpression {
+  public BeamSqlCurrentTimeExpression() {
+    super(Collections.<BeamSqlExpression>emptyList(), SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    // CURRENT_TIME has no param.
+    return true;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
new file mode 100644
index 0000000..68f1aa9
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for CEIL(date).
+ *
+ * <p>NOTE: only support CEIL for {@link TimeUnitRange#YEAR} and {@link 
TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateCeilExpression extends BeamSqlExpression {
+  public BeamSqlDateCeilExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.SYMBOL;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    Date date = opValueEvaluated(0, inputRecord);
+    long time = date.getTime();
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+    long newTime = DateTimeUtils.unixTimestampCeil(unit, time);
+    Date newDate = new Date(newTime);
+
+    return BeamSqlPrimitive.of(outputType, newDate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
new file mode 100644
index 0000000..4d446e3
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpression.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for FLOOR(date).
+ *
+ * <p>NOTE: only support FLOOR for {@link TimeUnitRange#YEAR} and {@link 
TimeUnitRange#MONTH}.
+ */
+public class BeamSqlDateFloorExpression extends BeamSqlExpression {
+  public BeamSqlDateFloorExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.DATE);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.SYMBOL;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    Date date = opValueEvaluated(0, inputRecord);
+    long time = date.getTime();
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(1)).getValue();
+
+    long newTime = DateTimeUtils.unixTimestampFloor(unit, time);
+    Date newDate = new Date(newTime);
+
+    return BeamSqlPrimitive.of(outputType, newDate);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
new file mode 100644
index 0000000..347a201
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpression.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for EXTRACT.
+ *
+ * <p>The following date functions also implicitly converted to {@code 
EXTRACT}:
+ * <ul>
+ *   <li>YEAR(date) =&gt; EXTRACT(YEAR FROM date)</li>
+ *   <li>MONTH(date) =&gt; EXTRACT(MONTH FROM date)</li>
+ *   <li>DAY(date) =&gt; EXTRACT(DAY FROM date)</li>
+ *   <li>QUARTER(date) =&gt; EXTRACT(QUARTER FROM date)</li>
+ *   <li>WEEK(date) =&gt; EXTRACT(WEEK FROM date)</li>
+ *   <li>DAYOFYEAR(date) =&gt; EXTRACT(DOY FROM date)</li>
+ *   <li>DAYOFMONTH(date) =&gt; EXTRACT(DAY FROM date)</li>
+ *   <li>DAYOFWEEK(date) =&gt; EXTRACT(DOW FROM date)</li>
+ *   <li>HOUR(date) =&gt; EXTRACT(HOUR FROM date)</li>
+ *   <li>MINUTE(date) =&gt; EXTRACT(MINUTE FROM date)</li>
+ *   <li>SECOND(date) =&gt; EXTRACT(SECOND FROM date)</li>
+ * </ul>
+ */
+public class BeamSqlExtractExpression extends BeamSqlExpression {
+  private static final Map<TimeUnitRange, Integer> typeMapping = new 
HashMap<>();
+  static {
+    typeMapping.put(TimeUnitRange.HOUR, Calendar.HOUR_OF_DAY);
+    typeMapping.put(TimeUnitRange.MINUTE, Calendar.MINUTE);
+    typeMapping.put(TimeUnitRange.SECOND, Calendar.SECOND);
+    typeMapping.put(TimeUnitRange.DOW, Calendar.DAY_OF_WEEK);
+    typeMapping.put(TimeUnitRange.DOY, Calendar.DAY_OF_YEAR);
+    typeMapping.put(TimeUnitRange.WEEK, Calendar.WEEK_OF_YEAR);
+  }
+
+  public BeamSqlExtractExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.INTEGER);
+  }
+  @Override public boolean accept() {
+    return operands.size() == 2
+        && opType(1) == SqlTypeName.BIGINT;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    Long time = opValueEvaluated(1, inputRecord);
+
+    TimeUnitRange unit = ((BeamSqlPrimitive<TimeUnitRange>) op(0)).getValue();
+
+    switch (unit) {
+      case YEAR:
+      case MONTH:
+      case DAY:
+        Long timeByDay = time / 1000 / 3600 / 24;
+        Long extracted = DateTimeUtils.unixDateExtract(
+            unit,
+            timeByDay
+        );
+        return BeamSqlPrimitive.of(outputType, extracted.intValue());
+
+      case HOUR:
+      case MINUTE:
+      case SECOND:
+      case DOY:
+      case DOW:
+      case WEEK:
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(new Date(time));
+        return BeamSqlPrimitive.of(outputType, 
calendar.get(typeMapping.get(unit)));
+
+      case QUARTER:
+        calendar = Calendar.getInstance();
+        calendar.setTime(new Date(time));
+        int ret = calendar.get(Calendar.MONTH) / 3;
+        if (ret * 3 < calendar.get(Calendar.MONTH)) {
+          ret += 1;
+        }
+        return BeamSqlPrimitive.of(outputType, ret);
+
+      default:
+        throw new BeamSqlUnsupportedException("Extract for time unit: " + unit 
+ " not supported!");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpression.java
new file mode 100644
index 0000000..09b223c
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpression.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIME and LOCALTIME(precison).
+ *
+ * <p>Returns the current date and time in the session time zone in a value of 
datatype TIME, with
+ * precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlLocalTimeExpression extends BeamSqlExpression {
+  public BeamSqlLocalTimeExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIME);
+  }
+  @Override public boolean accept() {
+    int opCount = getOperands().size();
+    return opCount <= 1;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    GregorianCalendar ret = new GregorianCalendar(TimeZone.getDefault());
+    ret.setTime(new Date());
+    return BeamSqlPrimitive.of(outputType, ret);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpression.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpression.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpression.java
new file mode 100644
index 0000000..fdf65c2
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.beam.dsls.sql.schema.BeamSqlRow;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * {@code BeamSqlExpression} for LOCALTIMESTAMP and LOCALTIMESTAMP(precision).
+ *
+ * <p>Returns the current date and time in the session time zone in a value of 
datatype TIMESTAMP,
+ * with precision digits of precision.
+ *
+ * <p>NOTE: for simplicity, we will ignore the {@code precision} param.
+ */
+public class BeamSqlLocalTimestampExpression extends BeamSqlExpression {
+  public BeamSqlLocalTimestampExpression(List<BeamSqlExpression> operands) {
+    super(operands, SqlTypeName.TIMESTAMP);
+  }
+  @Override public boolean accept() {
+    int opCount = getOperands().size();
+    return opCount <= 1;
+  }
+
+  @Override public BeamSqlPrimitive evaluate(BeamSqlRow inputRecord) {
+    return BeamSqlPrimitive.of(outputType, new Date());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
new file mode 100644
index 0000000..d3cc98f
--- /dev/null
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/date/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * date functions.
+ */
+package org.apache.beam.dsls.sql.interpreter.operator.date;

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
index 0f82733..f885aaf 100644
--- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
+++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/schema/BeamSqlRow.java
@@ -24,6 +24,7 @@ import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.beam.dsls.sql.exception.InvalidFieldException;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -141,6 +142,7 @@ public class BeamSqlRow implements Serializable {
         }
         break;
       case TIMESTAMP:
+      case DATE:
         if (!(fieldValue instanceof Date)) {
           throw new InvalidFieldException(
               String.format("[%s] doesn't match type [%s]", fieldValue, 
fieldType));

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
index 017c6ca..46d8326 100644
--- 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/BeamSqlFnExecutorTest.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
 import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
 
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
 import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlCaseExpression;
@@ -34,6 +37,13 @@ import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMinusExpr
 import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentDateExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlCurrentTimeExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateCeilExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlDateFloorExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlExtractExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimeExpression;
+import 
org.apache.beam.dsls.sql.interpreter.operator.date.BeamSqlLocalTimestampExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
 import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
@@ -46,6 +56,7 @@ import 
org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpressi
 import org.apache.beam.dsls.sql.rel.BeamFilterRel;
 import org.apache.beam.dsls.sql.rel.BeamProjectRel;
 import org.apache.beam.dsls.sql.rel.BeamRelNode;
+import org.apache.calcite.avatica.util.TimeUnitRange;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.sql.SqlOperator;
@@ -140,6 +151,7 @@ public class BeamSqlFnExecutorTest extends 
BeamSqlFnExecutorTestBase {
     assertTrue(exp.getClass().equals(clazz));
   }
 
+  @Test
   public void testBuildExpression_string()  {
     RexNode rexNode;
     BeamSqlExpression exp;
@@ -265,4 +277,75 @@ public class BeamSqlFnExecutorTest extends 
BeamSqlFnExecutorTestBase {
     exp = BeamSqlFnExecutor.buildExpression(rexNode);
     assertTrue(exp instanceof BeamSqlCaseExpression);
   }
+
+  @Test
+  public void testBuildExpression_date() {
+    RexNode rexNode;
+    BeamSqlExpression exp;
+    Calendar calendar = Calendar.getInstance();
+    calendar.setTimeZone(TimeZone.getTimeZone("GMT"));
+    calendar.setTime(new Date());
+
+    // CEIL
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CEIL,
+        Arrays.asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeFlag(TimeUnitRange.MONTH)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDateCeilExpression);
+
+    // FLOOR
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.FLOOR,
+        Arrays.asList(
+            rexBuilder.makeDateLiteral(calendar),
+            rexBuilder.makeFlag(TimeUnitRange.MONTH)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlDateFloorExpression);
+
+    // EXTRACT == EXTRACT_DATE?
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.EXTRACT,
+        Arrays.asList(
+            rexBuilder.makeFlag(TimeUnitRange.MONTH),
+            rexBuilder.makeDateLiteral(calendar)
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlExtractExpression);
+
+    // CURRENT_TIME
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_TIME,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentTimeExpression);
+
+    // CURRENT_DATE
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.CURRENT_DATE,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlCurrentDateExpression);
+
+    // LOCALTIME
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIME,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlLocalTimeExpression);
+
+    // LOCALTIMESTAMP
+    rexNode = rexBuilder.makeCall(SqlStdOperatorTable.LOCALTIMESTAMP,
+        Arrays.<RexNode>asList(
+        )
+    );
+    exp = BeamSqlFnExecutor.buildExpression(rexNode);
+    assertTrue(exp instanceof BeamSqlLocalTimestampExpression);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
new file mode 100644
index 0000000..897a351
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlReinterpretExpressionTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlReinterpretExpression}.
+ */
+public class BeamSqlReinterpretExpressionTest extends 
BeamSqlFnExecutorTestBase {
+
+  @Test public void accept() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, new Date()));
+    assertTrue(new BeamSqlReinterpretExpression(operands, 
SqlTypeName.BIGINT).accept());
+
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIMESTAMP, new Date()));
+    assertTrue(new BeamSqlReinterpretExpression(operands, 
SqlTypeName.BIGINT).accept());
+
+    operands.clear();
+    GregorianCalendar calendar = new GregorianCalendar();
+    calendar.setTime(new Date());
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
+    assertTrue(new BeamSqlReinterpretExpression(operands, 
SqlTypeName.BIGINT).accept());
+
+    // currently only support reinterpret DATE
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.VARCHAR, "hello"));
+    assertFalse(new BeamSqlReinterpretExpression(operands, 
SqlTypeName.BIGINT).accept());
+
+    // currently only support convert to BIGINT
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.TIME, calendar));
+    assertFalse(new BeamSqlReinterpretExpression(operands, 
SqlTypeName.TINYINT).accept());
+  }
+
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+
+    Date d = new Date();
+    d.setTime(1000);
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE, d));
+    assertEquals(1000L, new BeamSqlReinterpretExpression(operands, 
SqlTypeName.BIGINT)
+        .evaluate(record).getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
new file mode 100644
index 0000000..951fc8d
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentDateExpressionTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCurrentDateExpression.
+ */
+public class BeamSqlCurrentDateExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    assertEquals(SqlTypeName.DATE,
+        new BeamSqlCurrentDateExpression().evaluate(record).getOutputType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
new file mode 100644
index 0000000..8edf5fa
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlCurrentTimeExpressionTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlCurrentTimeExpression.
+ */
+public class BeamSqlCurrentTimeExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    assertEquals(SqlTypeName.TIMESTAMP,
+        new BeamSqlCurrentTimeExpression().evaluate(record).getOutputType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
new file mode 100644
index 0000000..8fc2178
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateCeilExpressionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlDateCeilExpression}.
+ */
+public class BeamSqlDateCeilExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE,
+        str2DateTime("2017-05-22 09:10:11")));
+    // YEAR
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+    assertEquals(str2DateTime("2018-01-01 00:00:00"),
+        new BeamSqlDateCeilExpression(operands).evaluate(record).getDate());
+
+    operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.MONTH));
+    assertEquals(str2DateTime("2017-06-01 00:00:00"),
+        new BeamSqlDateCeilExpression(operands).evaluate(record).getDate());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
new file mode 100644
index 0000000..bc906df
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateExpressionTestBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+
+import org.apache.beam.dsls.sql.interpreter.BeamSqlFnExecutorTestBase;
+
+/**
+ * Base class for all date related expression test.
+ */
+public class BeamSqlDateExpressionTestBase extends BeamSqlFnExecutorTestBase {
+  protected long str2LongTime(String dateStr) {
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    try {
+      Date date = format.parse(dateStr);
+      return date.getTime();
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  protected Date str2DateTime(String dateStr) {
+    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    try {
+      format.setTimeZone(TimeZone.getTimeZone("GMT"));
+      Date date = format.parse(dateStr);
+      return date;
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
new file mode 100644
index 0000000..3207d34
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlDateFloorExpressionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlDateFloorExpression}.
+ */
+public class BeamSqlDateFloorExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.DATE,
+        str2DateTime("2017-05-22 09:10:11")));
+    // YEAR
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+    assertEquals(str2DateTime("2017-01-01 00:00:00"),
+        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+    // MONTH
+    operands.set(1, BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.MONTH));
+    assertEquals(str2DateTime("2017-05-01 00:00:00"),
+        new BeamSqlDateFloorExpression(operands).evaluate(record).getDate());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
new file mode 100644
index 0000000..dc52d5a
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlExtractExpressionTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
+import org.apache.calcite.avatica.util.TimeUnitRange;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for {@code BeamSqlExtractExpression}.
+ */
+public class BeamSqlExtractExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test public void evaluate() throws Exception {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    long time = str2LongTime("2017-05-22 16:17:18");
+
+    // YEAR
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.YEAR));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(2017,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // MONTH
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.MONTH));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(5,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // DAY
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DAY));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(22,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // HOUR
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.HOUR));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(16,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // MINUTE
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.MINUTE));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(17,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // SECOND
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.SECOND));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(18,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // DAY_OF_WEEK
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOW));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(2,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // DAY_OF_YEAR
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.DOY));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(142,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // WEEK
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, TimeUnitRange.WEEK));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(21,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+    // QUARTER
+    operands.clear();
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.SYMBOL, 
TimeUnitRange.QUARTER));
+    operands.add(BeamSqlPrimitive.of(SqlTypeName.BIGINT,
+        time));
+    assertEquals(2,
+        new BeamSqlExtractExpression(operands).evaluate(record).getValue());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpressionTest.java
new file mode 100644
index 0000000..bae0b5c
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimeExpressionTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlLocalTimeExpression.
+ */
+public class BeamSqlLocalTimeExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    assertEquals(SqlTypeName.TIME,
+        new 
BeamSqlLocalTimeExpression(operands).evaluate(record).getOutputType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2bb57996/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpressionTest.java
 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpressionTest.java
new file mode 100644
index 0000000..5a794de
--- /dev/null
+++ 
b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/date/BeamSqlLocalTimestampExpressionTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.dsls.sql.interpreter.operator.date;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.junit.Test;
+
+/**
+ * Test for BeamSqlLocalTimestampExpression.
+ */
+public class BeamSqlLocalTimestampExpressionTest extends 
BeamSqlDateExpressionTestBase {
+  @Test
+  public void test() {
+    List<BeamSqlExpression> operands = new ArrayList<>();
+    assertEquals(SqlTypeName.TIMESTAMP,
+        new 
BeamSqlLocalTimestampExpression(operands).evaluate(record).getOutputType());
+  }
+}

Reply via email to