This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7a76e0afd12b8c84c5e3a580e77fe6b215f150b
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Oct 31 15:27:47 2018 +0100

    [FLINK-8897] [table] Reintroduce materialization of time attributes in 
filters
---
 .../table/calcite/RelTimeIndicatorConverter.scala  | 82 +++++++++++++---------
 .../plan/rules/datastream/DataStreamJoinRule.scala |  2 +-
 .../datastream/DataStreamWindowJoinRule.scala      |  2 +-
 .../flink/table/runtime/join/WindowJoinUtil.scala  | 12 ++--
 .../flink/table/api/stream/sql/JoinTest.scala      | 72 ++++++++++---------
 .../flink/table/api/stream/table/JoinTest.scala    | 39 +++++-----
 .../table/plan/TimeIndicatorConversionTest.scala   |  2 +-
 .../runtime/stream/TimeAttributesITCase.scala      | 46 ++++++++++++
 8 files changed, 168 insertions(+), 89 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 41f0fc5..c1bcf14 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -23,6 +23,7 @@ import org.apache.calcite.rel.core._
 import org.apache.calcite.rel.logical._
 import org.apache.calcite.rel.{RelNode, RelShuttle}
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
 import org.apache.flink.table.api.{TableException, ValidationException}
@@ -100,13 +101,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(matchRel: LogicalMatch): RelNode = {
     // visit children and update inputs
     val input = matchRel.getInput.accept(this)
-
-    // check if input field contains time indicator type
-    // materialize field if no time indicator is present anymore
-    // if input field is already materialized, change to timestamp type
-    val materializer = new RexTimeIndicatorMaterializer(
-      rexBuilder,
-      input.getRowType.getFieldList.map(_.getType))
+    val materializer = createMaterializer(input)
 
     // update input expressions
     val patternDefs = 
matchRel.getPatternDefinitions.mapValues(_.accept(materializer))
@@ -180,23 +175,16 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(filter: LogicalFilter): RelNode = {
     // visit children and update inputs
     val input = filter.getInput.accept(this)
+    val materializer = createMaterializer(input)
 
-    // We do not materialize time indicators in conditions because they can be 
locally evaluated.
-    // Some conditions are evaluated by special operators (e.g., time window 
joins).
-    // Time indicators in remaining conditions are materialized by Calc before 
the code generation.
-    LogicalFilter.create(input, filter.getCondition)
+    val condition = filter.getCondition.accept(materializer)
+    LogicalFilter.create(input, condition)
   }
 
   override def visit(project: LogicalProject): RelNode = {
     // visit children and update inputs
     val input = project.getInput.accept(this)
-
-    // check if input field contains time indicator type
-    // materialize field if no time indicator is present anymore
-    // if input field is already materialized, change to timestamp type
-    val materializer = new RexTimeIndicatorMaterializer(
-      rexBuilder,
-      input.getRowType.getFieldList.map(_.getType))
+    val materializer = createMaterializer(input)
 
     val projects = project.getProjects.map(_.accept(materializer))
     val fieldNames = project.getRowType.getFieldNames
@@ -206,8 +194,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
   override def visit(join: LogicalJoin): RelNode = {
     val left = join.getLeft.accept(this)
     val right = join.getRight.accept(this)
+    val materializer = createMaterializer(left, right)
 
-    LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, 
join.getJoinType)
+    LogicalJoin.create(
+      left,
+      right,
+      join.getCondition.accept(materializer),
+      join.getVariablesSet,
+      join.getJoinType)
   }
 
   def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = {
@@ -229,19 +223,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
       case scan: LogicalTableFunctionScan =>
         // visit children and update inputs
         val scanInputs = scan.getInputs.map(_.accept(this))
-
-        // check if input field contains time indicator type
-        // materialize field if no time indicator is present anymore
-        // if input field is already materialized, change to timestamp type
-        val materializer = new RexTimeIndicatorMaterializer(
-          rexBuilder,
-          inputs.head.getRowType.getFieldList.map(_.getType))
-
-        val call = scan.getCall.accept(materializer)
+        val materializer = createMaterializer(inputs.head)
         LogicalTableFunctionScan.create(
           scan.getCluster,
           scanInputs,
-          call,
+          scan.getCall.accept(materializer),
           scan.getElementType,
           scan.getRowType,
           scan.getColumnMappings)
@@ -369,6 +355,15 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) 
extends RelShuttle {
 
     indicesToMaterialize.toSet
   }
+
+  private def createMaterializer(inputs: RelNode*): 
RexTimeIndicatorMaterializer = {
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
+    new RexTimeIndicatorMaterializer(
+      rexBuilder,
+      inputs.flatMap(_.getRowType.getFieldList.map(_.getType)))
+  }
 }
 
 object RelTimeIndicatorConverter {
@@ -412,11 +407,34 @@ object RelTimeIndicatorConverter {
     * @return The expression with materialized time indicators.
     */
   def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: 
RexBuilder): RexNode = {
+    // check if input field contains time indicator type
+    // materialize field if no time indicator is present anymore
+    // if input field is already materialized, change to timestamp type
     val materializer = new RexTimeIndicatorMaterializer(
-          rexBuilder,
-          rowType.getFieldList.map(_.getType))
+      rexBuilder,
+      rowType.getFieldList.map(_.getType))
+
+    expr.accept(materializer)
+  }
+
+  /**
+    * Checks if the given call is a materialization call for either proctime 
or rowtime.
+    */
+  def isMaterializationCall(call: RexCall): Boolean = {
+    val isProctimeCall: Boolean = {
+      call.getOperator == ProctimeSqlFunction &&
+        call.getOperands.size() == 1 &&
+        isProctimeIndicatorType(call.getOperands.get(0).getType)
+    }
+
+    val isRowtimeCall: Boolean = {
+      call.getOperator == SqlStdOperatorTable.CAST &&
+        call.getOperands.size() == 1 &&
+        isRowtimeIndicatorType(call.getOperands.get(0).getType) &&
+        call.getType.getSqlTypeName == SqlTypeName.TIMESTAMP
+    }
 
-        expr.accept(materializer)
+    isProctimeCall || isRowtimeCall
   }
 }
 
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
index 072acb3..f51c088 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala
@@ -73,7 +73,7 @@ class DataStreamJoinRule
     val remainingPredsAccessTime = remainingPreds.isDefined &&
       accessesTimeAttribute(remainingPreds.get, join.getRowType)
 
-    // Check that no event-time attributes are in the input because non-window 
join is unbounded
+    // Check that no event-time attributes are in the output because 
non-window join is unbounded
     // and we don't know how much to hold back watermarks.
     val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala
       .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
index 3dfae99..cd9c5a8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala
@@ -52,7 +52,7 @@ class DataStreamWindowJoinRule
       if (windowBounds.get.isEventTime) {
         true
       } else {
-        // Check that no event-time attributes are in the input because the 
processing time window
+        // Check that no event-time attributes are in the output because the 
processing time window
         // join does not correctly hold back watermarks.
         // We rely on projection pushdown to remove unused attributes before 
the join.
         !join.getRowType.getFieldList.asScala
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
index 18e26df..3e355e8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala
@@ -23,12 +23,14 @@ import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.SqlKind
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.{SqlKind, SqlOperatorTable}
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.TableConfig
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.calcite.{FlinkTypeFactory, 
RelTimeIndicatorConverter}
 import org.apache.flink.table.codegen.{ExpressionReducer, 
FunctionCodeGenerator, GeneratedFunction}
+import org.apache.flink.table.functions.sql.ProctimeSqlFunction
 import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType}
 import org.apache.flink.types.Row
 
@@ -380,13 +382,13 @@ object WindowJoinUtil {
       */
     def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = {
       expr match {
+        case c: RexCall if RelTimeIndicatorConverter.isMaterializationCall(c) 
=>
+          // replace with timestamp
+          rexBuilder.makeZeroLiteral(expr.getType)
         case c: RexCall =>
           // replace in call operands
           val newOps = 
c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava
           rexBuilder.makeCall(c.getType, c.getOperator, newOps)
-        case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) 
=>
-          // replace with timestamp
-          rexBuilder.makeZeroLiteral(expr.getType)
         case _ => expr
       }
     }
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
index 736f9a2..f435113 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala
@@ -21,6 +21,7 @@ import org.apache.calcite.rel.logical.LogicalJoin
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.calcite.RelTimeIndicatorConverter
 import org.apache.flink.table.expressions.Null
 import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.runtime.join.WindowJoinUtil
@@ -29,6 +30,9 @@ import org.apache.flink.table.utils.{StreamTableTestUtil, 
TableTestBase}
 import org.junit.Assert._
 import org.junit.Test
 
+/**
+  * Tests for both windowed and non-windowed joins.
+  */
 class JoinTest extends TableTestBase {
   private val streamUtil: StreamTableTestUtil = streamTestUtil()
   streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 
'proctime.proctime)
@@ -62,8 +66,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "proctime")
           ),
           term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, +(proctime0, 3600000)))"),
+            "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 
3600000)), " +
+              "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
           term("join", "a, proctime, a0, b, proctime0"),
           term("joinType", "InnerJoin")
         ),
@@ -100,8 +104,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "c")
           ),
           term("where",
-            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
-              "<=(c, +(c0, 3600000)))"),
+            "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
+              "<=(CAST(c), +(CAST(c0), 3600000)))"),
           term("join", "a, c, a0, b, c0"),
           term("joinType", "InnerJoin")
         ),
@@ -138,8 +142,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "proctime")
           ),
           term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, +(proctime0, 3600000)))"),
+            "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 
3600000)), " +
+              "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
           term("join", "a, proctime, a0, b, proctime0"),
           term("joinType", "InnerJoin")
         ),
@@ -176,8 +180,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "c")
           ),
           term("where",
-            "AND(=(a, a0), >=(c, -(c0, 600000)), " +
-              "<=(c, +(c0, 3600000)))"),
+            "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " +
+              "<=(CAST(c), +(CAST(c0), 3600000)))"),
           term("join", "a, c, a0, b, c0"),
           term("joinType", "InnerJoin")
         ),
@@ -208,7 +212,7 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "a", "b", "proctime")
           ),
-          term("where", "AND(=(a, a0), =(proctime, proctime0))"),
+          term("where", "AND(=(a, a0), =(PROCTIME(proctime), 
PROCTIME(proctime0)))"),
           term("join", "a", "proctime", "a0", "b", "proctime0"),
           term("joinType", "InnerJoin")
         ),
@@ -238,7 +242,7 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "a", "b", "c")
           ),
-          term("where", "AND(=(a, a0), =(c, c0))"),
+          term("where", "AND(=(a, a0), =(CAST(c), CAST(c0)))"),
           term("join", "a", "c", "a0", "b", "c0"),
           term("joinType", "InnerJoin")
         ),
@@ -280,8 +284,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "a", "c", "proctime", "12 AS nullField")
           ),
-          term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, 
" +
-            "-(proctime0, 5000)), <=(proctime, +(proctime0, 5000)))"),
+          term("where", "AND(=(a, a0), =(nullField, nullField0), 
>=(PROCTIME(proctime), " +
+            "-(PROCTIME(proctime0), 5000)), <=(PROCTIME(proctime), 
+(PROCTIME(proctime0), 5000)))"),
           term("join", "a", "c", "proctime", "nullField", "a0", "c0", 
"proctime0", "nullField0"),
           term("joinType", "InnerJoin")
         ),
@@ -320,8 +324,8 @@ class JoinTest extends TableTestBase {
               term("select", "a", "b", "c")
             ),
             term("where",
-              "AND(=(a, a0), >=(c, -(c0, 600000)), " +
-                "<=(c, +(c0, 3600000)))"),
+              "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " +
+                "<=(CAST(c), +(CAST(c0), 3600000)))"),
             term("join", "a, b, c, a0, b0, c0"),
             term("joinType", "InnerJoin")
           ),
@@ -365,8 +369,8 @@ class JoinTest extends TableTestBase {
               term("select", "a", "b", "c")
             ),
             term("where",
-              "AND(=(a, a0), >=(c, -(c0, 600000)), " +
-                "<=(c, +(c0, 3600000)))"),
+              "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " +
+                "<=(CAST(c), +(CAST(c0), 3600000)))"),
             term("join", "a, b, c, a0, b0, c0"),
             term("joinType", "InnerJoin")
           ),
@@ -408,8 +412,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "proctime")
           ),
           term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, +(proctime0, 3600000)))"),
+            "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 
3600000)), " +
+              "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
           term("join", "a, proctime, a0, b, proctime0"),
           term("joinType", "LeftOuterJoin")
         ),
@@ -446,8 +450,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "c")
           ),
           term("where",
-            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
-              "<=(c, +(c0, 3600000)))"),
+            "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
+              "<=(CAST(c), +(CAST(c0), 3600000)))"),
           term("join", "a, c, a0, b, c0"),
           term("joinType", "LeftOuterJoin")
         ),
@@ -485,8 +489,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "proctime")
           ),
           term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, +(proctime0, 3600000)))"),
+            "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 
3600000)), " +
+              "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
           term("join", "a, proctime, a0, b, proctime0"),
           term("joinType", "RightOuterJoin")
         ),
@@ -523,8 +527,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "c")
           ),
           term("where",
-            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
-              "<=(c, +(c0, 3600000)))"),
+            "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
+              "<=(CAST(c), +(CAST(c0), 3600000)))"),
           term("join", "a, c, a0, b, c0"),
           term("joinType", "RightOuterJoin")
         ),
@@ -562,8 +566,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "proctime")
           ),
           term("where",
-            "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " +
-              "<=(proctime, +(proctime0, 3600000)))"),
+            "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 
3600000)), " +
+              "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"),
           term("join", "a, proctime, a0, b, proctime0"),
           term("joinType", "FullOuterJoin")
         ),
@@ -600,8 +604,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "c")
           ),
           term("where",
-            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
-              "<=(c, +(c0, 3600000)))"),
+            "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
+              "<=(CAST(c), +(CAST(c0), 3600000)))"),
           term("join", "a, c, a0, b, c0"),
           term("joinType", "FullOuterJoin")
         ),
@@ -640,8 +644,8 @@ class JoinTest extends TableTestBase {
             term("select", "a", "b", "c")
           ),
           term("where",
-            "AND(=(a, a0), >=(c, -(c0, 10000)), " +
-              "<=(c, +(c0, 3600000)), LIKE(b, b0))"),
+            "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " +
+              "<=(CAST(c), +(CAST(c0), 3600000)), LIKE(b, b0))"),
           term("join", "a, b, c, a0, b0, c0"),
           // Since we filter on attributes b and b0 after the join, the full 
outer join
           // will be automatically optimized to inner join.
@@ -795,7 +799,9 @@ class JoinTest extends TableTestBase {
       "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a 
and " + timeSql
 
     val resultTable = streamUtil.tableEnv.sqlQuery(query)
-    val relNode = resultTable.getRelNode
+    val relNode = RelTimeIndicatorConverter.convert(
+      resultTable.getRelNode,
+      streamUtil.tableEnv.getRelBuilder.getRexBuilder)
     val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
     val (windowBounds, _) =
       WindowJoinUtil.extractWindowBoundsFromPredicate(
@@ -1008,7 +1014,9 @@ class JoinTest extends TableTestBase {
       expectCondStr: String): Unit = {
 
     val resultTable = streamUtil.tableEnv.sqlQuery(query)
-    val relNode = resultTable.getRelNode
+    val relNode = RelTimeIndicatorConverter.convert(
+      resultTable.getRelNode,
+      streamUtil.tableEnv.getRelBuilder.getRexBuilder)
     val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin]
     val (_, remainCondition) =
       WindowJoinUtil.extractWindowBoundsFromPredicate(
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
index d7f5c71..138497c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.utils.TableTestUtil._
 import org.junit.Test
 
 /**
-  * Currently only time-windowed joins can be processed in a streaming fashion.
+  * Tests for both windowed and non-windowed joins.
   */
 class JoinTest extends TableTestBase {
 
@@ -57,8 +57,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rrtime")
           ),
-          term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
-            " <(lrtime, +(rrtime, 3000)))"),
+          term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 
300000))," +
+            " <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
           term("join", "a", "lrtime", "d", "e", "rrtime"),
           term("joinType", "InnerJoin")
         ),
@@ -92,7 +92,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rptime")
           ),
-          term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, 
rptime))"),
+          term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 
1000)), " +
+            "<(PROCTIME(lptime), PROCTIME(rptime)))"),
           term("join", "a", "lptime", "d", "e", "rptime"),
           term("joinType", "InnerJoin")
         ),
@@ -126,7 +127,7 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rptime")
           ),
-          term("where", "AND(=(a, d), =(lptime, rptime))"),
+          term("where", "AND(=(a, d), =(PROCTIME(lptime), PROCTIME(rptime)))"),
           term("join", "a", "lptime", "d", "e", "rptime"),
           term("joinType", "InnerJoin")
         ),
@@ -153,7 +154,8 @@ class JoinTest extends TableTestBase {
         streamTableNode(0),
         streamTableNode(1),
         term("where",
-          "AND(=(a, d), >=(lrtime, -(rrtime, 300000)), <(lrtime, rrtime), 
>(lrtime, " + "f))"),
+          "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000)), " +
+            "<(CAST(lrtime), CAST(rrtime)), >(CAST(lrtime), f))"),
         term("join", "a", "b", "c", "lrtime", "d", "e", "f", "rrtime"),
         term("joinType", "InnerJoin")
       )
@@ -188,8 +190,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rrtime")
           ),
-          term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
-            " <(lrtime, +(rrtime, 3000)))"),
+          term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 
300000))," +
+            " <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
           term("join", "a", "lrtime", "d", "e", "rrtime"),
           term("joinType", "LeftOuterJoin")
         ),
@@ -223,7 +225,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rptime")
           ),
-          term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, 
rptime))"),
+          term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 
1000)), " +
+            "<(PROCTIME(lptime), PROCTIME(rptime)))"),
           term("join", "a", "lptime", "d", "e", "rptime"),
           term("joinType", "LeftOuterJoin")
         ),
@@ -260,8 +263,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rrtime")
           ),
-          term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
-            " <(lrtime, +(rrtime, 3000)))"),
+          term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 
300000))," +
+            " <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
           term("join", "a", "lrtime", "d", "e", "rrtime"),
           term("joinType", "RightOuterJoin")
         ),
@@ -295,7 +298,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rptime")
           ),
-          term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, 
rptime))"),
+          term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 
1000)), " +
+            "<(PROCTIME(lptime), PROCTIME(rptime)))"),
           term("join", "a", "lptime", "d", "e", "rptime"),
           term("joinType", "RightOuterJoin")
         ),
@@ -332,8 +336,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rrtime")
           ),
-          term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
-            " <(lrtime, +(rrtime, 3000)))"),
+          term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 
300000))," +
+            " <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
           term("join", "a", "lrtime", "d", "e", "rrtime"),
           term("joinType", "FullOuterJoin")
         ),
@@ -367,7 +371,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rptime")
           ),
-          term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, 
rptime))"),
+          term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 
1000)), " +
+            "<(PROCTIME(lptime), PROCTIME(rptime)))"),
           term("join", "a", "lptime", "d", "e", "rptime"),
           term("joinType", "FullOuterJoin")
         ),
@@ -402,8 +407,8 @@ class JoinTest extends TableTestBase {
             streamTableNode(1),
             term("select", "d", "e", "rrtime")
           ),
-          term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," +
-            " <(lrtime, +(rrtime, 3000)))"),
+          term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 
300000))," +
+            " <(CAST(lrtime), +(CAST(rrtime), 3000)))"),
           term("join", "a", "lrtime", "d", "e", "rrtime"),
           // Since we filter on attributes of the left table after the join, 
the left outer join
           // will be automatically optimized to inner join.
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 1706169..29dda21 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
       "DataStreamCalc",
       streamTableNode(0),
       term("select", "rowtime"),
-      term("where", ">(rowtime, 1990-12-02 12:11:11)")
+      term("where", ">(CAST(rowtime), 1990-12-02 12:11:11)")
     )
 
     util.verifyTable(result, expected)
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
index 304dbb3..1706fc8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.stream
 
 import java.lang.{Integer => JInt, Long => JLong}
 import java.math.BigDecimal
+import java.sql.Timestamp
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
@@ -661,6 +662,51 @@ class TimeAttributesITCase extends AbstractTestBase {
     )
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testMaterializedRowtimeFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(String, Timestamp, Int)]
+    data.+=(("ACME", new Timestamp(1000L), 12))
+    data.+=(("ACME", new Timestamp(2000L), 17))
+    data.+=(("ACME", new Timestamp(3000L), 13))
+    data.+=(("ACME", new Timestamp(4000L), 11))
+
+    val t = env.fromCollection(data)
+      .assignAscendingTimestamps(e => e._2.toInstant.toEpochMilli)
+      .toTable(tEnv, 'symbol, 'tstamp.rowtime, 'price)
+    tEnv.registerTable("Ticker", t)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM (
+         |   SELECT symbol, SUM(price) as price,
+         |     TUMBLE_ROWTIME(tstamp, interval '1' second) as rowTime,
+         |     TUMBLE_START(tstamp, interval '1' second) as startTime,
+         |     TUMBLE_END(tstamp, interval '1' second) as endTime
+         |   FROM Ticker
+         |   GROUP BY symbol, TUMBLE(tstamp, interval '1' second)
+         |)
+         |WHERE startTime < endTime
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = List(
+      "ACME,12,1970-01-01 00:00:01.999,1970-01-01 00:00:01.0,1970-01-01 
00:00:02.0",
+      "ACME,17,1970-01-01 00:00:02.999,1970-01-01 00:00:02.0,1970-01-01 
00:00:03.0",
+      "ACME,13,1970-01-01 00:00:03.999,1970-01-01 00:00:03.0,1970-01-01 
00:00:04.0",
+      "ACME,11,1970-01-01 00:00:04.999,1970-01-01 00:00:04.0,1970-01-01 
00:00:05.0")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object TimeAttributesITCase {

Reply via email to