Repository: samza
Updated Branches:
  refs/heads/master 493053dff -> 25d943f1c


SAMZA-1820: Support for all the calcite timestamp functions

The DataContextImpl that we are passing to Calcite supports just the 
current_timestamp. This change adds support for all the other timestamp 
functions.
Timestamp functions like MONTH(DATE) needs support for EXTRACT function. Adding 
that to the operator table.

Author: Srinivasulu Punuru <[email protected]>

Reviewers: Aditya Toomula <[email protected]>

Closes #614 from srinipunuru/time.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/25d943f1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/25d943f1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/25d943f1

Branch: refs/heads/master
Commit: 25d943f1c560d41ec593a0a76262d7aa8ef1da56
Parents: 493053d
Author: Srinivasulu Punuru <[email protected]>
Authored: Thu Aug 23 17:04:29 2018 -0700
Committer: Srinivasulu Punuru <[email protected]>
Committed: Thu Aug 23 17:04:29 2018 -0700

----------------------------------------------------------------------
 .../samza/sql/planner/SamzaSqlOperatorTable.java    |  1 +
 .../samza/sql/translator/TranslatorContext.java     | 16 ++++++++++++----
 .../samza/test/samzasql/TestSamzaSqlEndToEnd.java   |  2 +-
 3 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/25d943f1/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
index b078f5b..9c16de8 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java
@@ -77,6 +77,7 @@ public class SamzaSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
   public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
   public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
   public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
+  public static final SqlFunction EXTRACT = SqlStdOperatorTable.EXTRACT;
   public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
   public static final SqlFunction LOCALTIMESTAMP = 
SqlStdOperatorTable.LOCALTIMESTAMP;
   public static final SqlFunction CURRENT_TIME = 
SqlStdOperatorTable.CURRENT_TIME;

http://git-wip-us.apache.org/repos/asf/samza/blob/25d943f1/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
----------------------------------------------------------------------
diff --git 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
index 7a25efb..fcebeef 100644
--- 
a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
+++ 
b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java
@@ -22,6 +22,7 @@ package org.apache.samza.sql.translator;
 import java.util.HashMap;
 import java.util.Map;
 
+import java.util.TimeZone;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.linq4j.QueryProvider;
@@ -77,11 +78,18 @@ public class TranslatorContext implements Cloneable {
 
     @Override
     public Object get(String name) {
-      if (name.equals(Variable.CURRENT_TIMESTAMP.camelName)) {
-        return System.currentTimeMillis();
+      TimeZone timeZone = TimeZone.getDefault();
+      long timeMs = System.currentTimeMillis();
+      long offsetMs = timeZone.getOffset(timeMs);
+      if (name.equals(Variable.LOCAL_TIMESTAMP.camelName)) {
+        return timeMs + offsetMs;
+      } else if (name.equals(Variable.UTC_TIMESTAMP.camelName) || 
name.equals(Variable.CURRENT_TIMESTAMP.camelName)) {
+        return timeMs;
+      } else if (name.equals(Variable.TIME_ZONE.camelName)) {
+        return timeZone;
+      } else {
+        throw new UnsupportedOperationException("Unsupported operation " + 
name);
       }
-
-      return null;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/25d943f1/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
index 4ee42ae..c211f03 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java
@@ -75,7 +75,7 @@ public class TestSamzaSqlEndToEnd extends 
AbstractIntegrationTestHarness {
 
     TestAvroSystemFactory.messages.clear();
     Map<String, String> staticConfigs = 
SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
-    String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as 
long_value from testavro.SIMPLE1";
+    String sql1 = "Insert into testavro.outputTopic select id, 
TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as 
long_value from testavro.SIMPLE1";
     List<String> sqlStmts = Arrays.asList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, 
JsonUtil.toJson(sqlStmts));
     SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new 
MapConfig(staticConfigs));

Reply via email to