[FLINK-5829] [table] Bump Calcite version to 1.12. This closes #3613.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ceec0a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ceec0a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ceec0a Branch: refs/heads/table-retraction Commit: 05ceec0ace45135bc1cc17934a0b8721f4f85a03 Parents: f1ff99f Author: Haohui Mai <whe...@apache.org> Authored: Tue Mar 28 11:50:51 2017 -0700 Committer: twalthr <twal...@apache.org> Committed: Mon Apr 3 17:26:28 2017 +0200 ---------------------------------------------------------------------- docs/dev/table_api.md | 18 --------- flink-libraries/flink-table/pom.xml | 2 +- .../flink/table/api/TableEnvironment.scala | 20 ++-------- .../flink/table/codegen/CodeGenerator.scala | 3 ++ .../flink/table/expressions/literals.scala | 4 +- .../table/plan/util/RexProgramExtractor.scala | 2 + .../api/java/batch/TableEnvironmentITCase.java | 39 -------------------- .../table/api/java/batch/TableSourceITCase.java | 2 +- .../flink/table/ExpressionReductionTest.scala | 16 ++++---- .../flink/table/TableEnvironmentTest.scala | 2 - .../apache/flink/table/TableSourceTest.scala | 4 +- .../api/scala/batch/TableSourceITCase.scala | 2 +- .../api/scala/stream/TableSourceITCase.scala | 2 +- .../catalog/ExternalCatalogSchemaTest.scala | 7 ++-- .../table/expressions/ScalarFunctionsTest.scala | 18 ++++----- .../expressions/utils/ExpressionTestBase.scala | 8 +++- 16 files changed, 43 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/docs/dev/table_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 117f32f..7c37aea 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -326,24 +326,6 @@ val batchTable = batchTableEnvironment.scan("mycsv") </div> </div> -### Unregister a Table - -A table can be unregistered using the following method. Subsequent SQL queries won't find the unregistered table name anymore. - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -tableEnvironment.unregisterTable("Customers"); -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -tableEnvironment.unregisterTable("Customers") -{% endhighlight %} -</div> -</div> - Registering external Catalogs -------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index 6bcddc2..49eb451 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -51,7 +51,7 @@ under the License. <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> - <version>1.11.0</version> + <version>1.12.0</version> <exclusions> <exclusion> <groupId>org.apache.calcite.avatica</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index bb4c3ac..2ddad45 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -21,6 +21,7 @@ package org.apache.flink.table.api import _root_.java.lang.reflect.Modifier import _root_.java.util.concurrent.atomic.AtomicInteger +import com.google.common.collect.ImmutableList import org.apache.calcite.config.Lex import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.RelOptPlanner.CannotPlanException @@ -231,7 +232,8 @@ abstract class TableEnvironment(val config: TableConfig) { val optProgram = Programs.ofRules(ruleSet) val output = try { - optProgram.run(getPlanner, input, targetTraits) + optProgram.run(getPlanner, input, targetTraits, + ImmutableList.of(), ImmutableList.of()) } catch { case e: CannotPlanException => throw new TableException( @@ -351,22 +353,6 @@ abstract class TableEnvironment(val config: TableConfig) { def registerTableSource(name: String, tableSource: TableSource[_]): Unit /** - * Unregisters a [[Table]] in the TableEnvironment's catalog. - * Unregistered tables cannot be referenced in SQL queries anymore. - * - * @param name The name under which the table is registered. - * @return true if table could be unregistered; false otherwise. - */ - def unregisterTable(name: String): Boolean = { - if (isRegistered(name)) { - internalSchema.tableMap.remove(name) - true - } else { - false - } - } - - /** * Replaces a registered Table with another Table under the same name. * We use this method to replace a [[org.apache.flink.table.plan.schema.DataStreamTable]] * with a [[org.apache.calcite.schema.TranslatableTable]]. http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 6658645..46b1dcf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -1130,6 +1130,9 @@ class CodeGenerator( override def visitSubQuery(subQuery: RexSubQuery): GeneratedExpression = throw new CodeGenException("Subqueries are not supported yet.") + override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): GeneratedExpression = + throw new CodeGenException("Pattern field references are not supported yet.") + // ---------------------------------------------------------------------------------------------- // generator helping methods // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala index 916fe73..053e7ed 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala @@ -31,6 +31,8 @@ import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} object Literal { + private[flink] val GMT = TimeZone.getTimeZone("GMT") + private[flink] def apply(l: Any): Literal = l match { case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO) @@ -103,7 +105,7 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre private def dateToCalendar: Calendar = { val date = value.asInstanceOf[java.util.Date] - val cal = Calendar.getInstance() + val cal = Calendar.getInstance(Literal.GMT) val t = date.getTime // according to Calcite's SqlFunctions.internalToXXX methods cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t)) http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala index a042f55..ba8713d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala @@ -189,6 +189,8 @@ class RexNodeToExpressionConverter( override def visitOver(over: RexOver): Option[Expression] = None + override def visitPatternFieldRef(fieldRef: RexPatternFieldRef): Option[Expression] = None + private def lookupFunction(name: String, operands: Seq[Expression]): Option[Expression] = { Try(functionCatalog.lookupFunction(name, operands)) match { case Success(expr) => Some(expr) http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index ebe79fa..d4db13e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -146,45 +146,6 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { } @Test(expected = TableException.class) - public void testTableUnregister() throws Exception { - final String tableName = "MyTable"; - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table t = tableEnv.fromDataSet(ds); - tableEnv.registerTable(tableName, t); - tableEnv.unregisterTable(tableName); - // Must fail. Table name is not register anymore. - tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7"); - } - - @Test - public void testTableRegisterNew() throws Exception { - final String tableName = "MyTable"; - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); - - DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); - Table t = tableEnv.fromDataSet(ds); - tableEnv.registerTable(tableName, t); - - tableEnv.unregisterTable(tableName); - - Table t2 = tableEnv.fromDataSet(ds).filter("f0 > 8"); - tableEnv.registerTable(tableName, t2); - - Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7"); - - DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + - "13,5\n" + "14,5\n" + "15,5\n" + - "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"; - compareResultAsText(results, expected); - } - - @Test(expected = TableException.class) public void testIllegalName() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java index becd870..a7ccb7e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java @@ -76,7 +76,7 @@ public class TableSourceITCase extends TableProgramsCollectionTestBase { tableEnv.registerTableSource("persons", csvTable); Table result = tableEnv - .sql("SELECT last, FLOOR(id), score * 2 FROM persons WHERE score < 20"); + .sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20"); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); List<Row> results = resultSet.collect(); http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala index b660243..314d863 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala @@ -57,7 +57,7 @@ class ExpressionReductionTest extends TableTestBase { "'STRING' AS EXPR$3", "'teststring' AS EXPR$4", "null AS EXPR$5", - "1990-10-24 23:00:01 AS EXPR$6", + "1990-10-24 23:00:01.123 AS EXPR$6", "19 AS EXPR$7", "false AS EXPR$8", "true AS EXPR$9", @@ -102,7 +102,7 @@ class ExpressionReductionTest extends TableTestBase { "'STRING' AS EXPR$3", "'teststring' AS EXPR$4", "null AS EXPR$5", - "1990-10-24 23:00:01 AS EXPR$6", + "1990-10-24 23:00:01.123 AS EXPR$6", "19 AS EXPR$7", "false AS EXPR$8", "true AS EXPR$9", @@ -159,7 +159,7 @@ class ExpressionReductionTest extends TableTestBase { "'b' AS _c1", "'STRING' AS _c2", "'teststring' AS _c3", - "1990-10-24 23:00:01 AS _c4", + "1990-10-24 23:00:01.123 AS _c4", "false AS _c5", "true AS _c6", "2E0 AS _c7", @@ -195,7 +195,7 @@ class ExpressionReductionTest extends TableTestBase { "'b' AS _c1", "'STRING' AS _c2", "'teststring' AS _c3", - "1990-10-24 23:00:01 AS _c4", + "1990-10-24 23:00:01.123 AS _c4", "false AS _c5", "true AS _c6", "2E0 AS _c7", @@ -255,7 +255,7 @@ class ExpressionReductionTest extends TableTestBase { "'STRING' AS EXPR$3", "'teststring' AS EXPR$4", "null AS EXPR$5", - "1990-10-24 23:00:01 AS EXPR$6", + "1990-10-24 23:00:01.123 AS EXPR$6", "19 AS EXPR$7", "false AS EXPR$8", "true AS EXPR$9", @@ -300,7 +300,7 @@ class ExpressionReductionTest extends TableTestBase { "'STRING' AS EXPR$3", "'teststring' AS EXPR$4", "null AS EXPR$5", - "1990-10-24 23:00:01 AS EXPR$6", + "1990-10-24 23:00:01.123 AS EXPR$6", "19 AS EXPR$7", "false AS EXPR$8", "true AS EXPR$9", @@ -357,7 +357,7 @@ class ExpressionReductionTest extends TableTestBase { "'b' AS _c1", "'STRING' AS _c2", "'teststring' AS _c3", - "1990-10-24 23:00:01 AS _c4", + "1990-10-24 23:00:01.123 AS _c4", "false AS _c5", "true AS _c6", "2E0 AS _c7", @@ -393,7 +393,7 @@ class ExpressionReductionTest extends TableTestBase { "'b' AS _c1", "'STRING' AS _c2", "'teststring' AS _c3", - "1990-10-24 23:00:01 AS _c4", + "1990-10-24 23:00:01.123 AS _c4", "false AS _c5", "true AS _c6", "2E0 AS _c7", http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index 767e83f..50fafbe 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -282,7 +282,6 @@ class TableEnvironmentTest extends TableTestBase { def testSqlWithoutRegisteringForBatchTables(): Unit = { val util = batchTestUtil() val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c) - util.tEnv.unregisterTable("tableName") val sqlTable = util.tEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12") @@ -321,7 +320,6 @@ class TableEnvironmentTest extends TableTestBase { def testSqlWithoutRegisteringForStreamTables(): Unit = { val util = streamTestUtil() val table = util.addTable[(Long, Int, String)]("tableName", 'a, 'b, 'c) - util.tEnv.unregisterTable("tableName") val sqlTable = util.tEnv.sql(s"SELECT a, b, c FROM $table WHERE b > 12") http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala index 97d4d59..1866e3c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableSourceTest.scala @@ -61,7 +61,7 @@ class TableSourceTest extends TableTestBase { util.tEnv.registerTableSource(tableName, tableSource) - val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName" + val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName" val expected = unaryNode( "DataSetCalc", @@ -245,7 +245,7 @@ class TableSourceTest extends TableTestBase { util.tEnv.registerTableSource(tableName, tableSource) - val sqlQuery = s"SELECT last, floor(id), score * 2 FROM $tableName" + val sqlQuery = s"SELECT `last`, floor(id), score * 2 FROM $tableName" val expected = unaryNode( "DataStreamCalc", http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala index 7e349cf..6ed2cf1 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableSourceITCase.scala @@ -46,7 +46,7 @@ class TableSourceITCase( tEnv.registerTableSource("csvTable", csvTable) val results = tEnv.sql( - "SELECT id, first, last, score FROM csvTable").collect() + "SELECT id, `first`, `last`, score FROM csvTable").collect() val expected = Seq( "1,Mike,Smith,12.3", http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala index 66711cb..9298266 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala @@ -45,7 +45,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { tEnv.registerTableSource("persons", csvTable) tEnv.sql( - "SELECT id, first, last, score FROM persons WHERE id < 4 ") + "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ") .toDataStream[Row] .addSink(new StreamITCase.StringSink) http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala index b780a3f..127c860 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/ExternalCatalogSchemaTest.scala @@ -25,13 +25,13 @@ import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.sql.validate.SqlMonikerType -import org.apache.commons.collections.CollectionUtils import org.apache.flink.table.calcite.{FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.CsvTableSource import org.apache.flink.table.utils.CommonTestData -import org.junit.{Before, Test} import org.junit.Assert._ +import org.junit.{Before, Test} + import scala.collection.JavaConverters._ class ExternalCatalogSchemaTest { @@ -63,7 +63,8 @@ class ExternalCatalogSchemaTest { val subSchemas = allSchemaObjectNames.asScala .filter(_.getType.equals(SqlMonikerType.SCHEMA)) .map(_.getFullyQualifiedNames.asScala.toList).toSet - assertTrue(Set(List(schemaName, "db1"), List(schemaName, "db2")) == subSchemas) + assertTrue(Set(List(schemaName), List(schemaName, "db1"), + List(schemaName, "db2")) == subSchemas) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 03be995..9258c02 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -1014,16 +1014,14 @@ class ScalarFunctionsTest extends ExpressionTestBase { "(TIMESTAMP '2011-03-10 05:02:02', TIMESTAMP '2011-03-10 05:02:01')", "false") - // TODO enable once CALCITE-1435 is fixed - // comparison of timestamps based on milliseconds is buggy - //testAllApis( - // temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli, - // "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp), - // "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " + - // "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)", - // "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " + - // "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')", - // "false") + testAllApis( + temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli, + "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp), + "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " + + "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)", + "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " + + "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')", + "false") } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/05ceec0a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala index d8de554..9a6562a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ExpressionTestBase.scala @@ -21,6 +21,8 @@ package org.apache.flink.table.expressions.utils import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder} import java.util import java.util.concurrent.Future + +import com.google.common.collect.ImmutableList import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql2rel.RelDecorrelator @@ -194,7 +196,8 @@ abstract class ExpressionTestBase { // create DataSetCalc val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() - val dataSetCalc = optProgram.run(context._2.getPlanner, normalizedPlan, flinkOutputProps) + val dataSetCalc = optProgram.run(context._2.getPlanner, normalizedPlan, flinkOutputProps, + ImmutableList.of(), ImmutableList.of()) // extract RexNode val calcProgram = dataSetCalc @@ -217,7 +220,8 @@ abstract class ExpressionTestBase { // create DataSetCalc val decorPlan = RelDecorrelator.decorrelateQuery(converted) val flinkOutputProps = converted.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() - val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps) + val dataSetCalc = optProgram.run(context._2.getPlanner, decorPlan, flinkOutputProps, + ImmutableList.of(), ImmutableList.of()) // extract RexNode val calcProgram = dataSetCalc