Repository: samza Updated Branches: refs/heads/master 493053dff -> 25d943f1c
SAMZA-1820: Support for all the calcite timestamp functions The DataContextImpl that we are passing to Calcite supports just the current_timestamp. This change adds support for all the other timestamp functions. Timestamp functions like MONTH(DATE) needs support for EXTRACT function. Adding that to the operator table. Author: Srinivasulu Punuru <[email protected]> Reviewers: Aditya Toomula <[email protected]> Closes #614 from srinipunuru/time.1 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/25d943f1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/25d943f1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/25d943f1 Branch: refs/heads/master Commit: 25d943f1c560d41ec593a0a76262d7aa8ef1da56 Parents: 493053d Author: Srinivasulu Punuru <[email protected]> Authored: Thu Aug 23 17:04:29 2018 -0700 Committer: Srinivasulu Punuru <[email protected]> Committed: Thu Aug 23 17:04:29 2018 -0700 ---------------------------------------------------------------------- .../samza/sql/planner/SamzaSqlOperatorTable.java | 1 + .../samza/sql/translator/TranslatorContext.java | 16 ++++++++++++---- .../samza/test/samzasql/TestSamzaSqlEndToEnd.java | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/25d943f1/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java index b078f5b..9c16de8 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlOperatorTable.java @@ -77,6 +77,7 @@ public class SamzaSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction MOD = SqlStdOperatorTable.MOD; public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR; public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL; + public static final SqlFunction EXTRACT = SqlStdOperatorTable.EXTRACT; public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME; public static final SqlFunction LOCALTIMESTAMP = SqlStdOperatorTable.LOCALTIMESTAMP; public static final SqlFunction CURRENT_TIME = SqlStdOperatorTable.CURRENT_TIME; http://git-wip-us.apache.org/repos/asf/samza/blob/25d943f1/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java index 7a25efb..fcebeef 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/translator/TranslatorContext.java @@ -22,6 +22,7 @@ package org.apache.samza.sql.translator; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import org.apache.calcite.DataContext; import org.apache.calcite.adapter.java.JavaTypeFactory; import org.apache.calcite.linq4j.QueryProvider; @@ -77,11 +78,18 @@ public class TranslatorContext implements Cloneable { @Override public Object get(String name) { - if (name.equals(Variable.CURRENT_TIMESTAMP.camelName)) { - return System.currentTimeMillis(); + TimeZone timeZone = TimeZone.getDefault(); + long timeMs = System.currentTimeMillis(); + long offsetMs = timeZone.getOffset(timeMs); + if (name.equals(Variable.LOCAL_TIMESTAMP.camelName)) { + return timeMs + offsetMs; + } else if (name.equals(Variable.UTC_TIMESTAMP.camelName) || name.equals(Variable.CURRENT_TIMESTAMP.camelName)) { + return timeMs; + } else if (name.equals(Variable.TIME_ZONE.camelName)) { + return timeZone; + } else { + throw new UnsupportedOperationException("Unsupported operation " + name); } - - return null; } } http://git-wip-us.apache.org/repos/asf/samza/blob/25d943f1/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 4ee42ae..c211f03 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -75,7 +75,7 @@ public class TestSamzaSqlEndToEnd extends AbstractIntegrationTestHarness { TestAvroSystemFactory.messages.clear(); Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages); - String sql1 = "Insert into testavro.outputTopic select id, CURRENT_TIME as long_value from testavro.SIMPLE1"; + String sql1 = "Insert into testavro.outputTopic select id, TIMESTAMPDIFF(HOUR, CURRENT_TIMESTAMP, LOCALTIMESTAMP) + MONTH(CURRENT_DATE) as long_value from testavro.SIMPLE1"; List<String> sqlStmts = Arrays.asList(sql1); staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts)); SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
