This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit fa256b2a8983d899f71ba8bbda415db9a57f6c23 Author: Timo Walther <twal...@apache.org> AuthorDate: Wed Oct 31 15:27:47 2018 +0100 [FLINK-8897] [table] Reintroduce materialization of time attributes in filters --- .../table/calcite/RelTimeIndicatorConverter.scala | 82 +++++++++++++--------- .../plan/rules/datastream/DataStreamJoinRule.scala | 2 +- .../datastream/DataStreamWindowJoinRule.scala | 2 +- .../flink/table/runtime/join/WindowJoinUtil.scala | 12 ++-- .../flink/table/api/stream/sql/JoinTest.scala | 72 ++++++++++--------- .../flink/table/api/stream/table/JoinTest.scala | 39 +++++----- .../table/plan/TimeIndicatorConversionTest.scala | 2 +- .../runtime/stream/TimeAttributesITCase.scala | 46 ++++++++++++ 8 files changed, 168 insertions(+), 89 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala index 41f0fc5..c1bcf14 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala @@ -23,6 +23,7 @@ import org.apache.calcite.rel.core._ import org.apache.calcite.rel.logical._ import org.apache.calcite.rel.{RelNode, RelShuttle} import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo import org.apache.flink.table.api.{TableException, ValidationException} @@ -100,13 +101,7 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(matchRel: LogicalMatch): RelNode = { // visit children and update inputs val input = matchRel.getInput.accept(this) - - // check if input field contains time indicator type - // materialize field if no time indicator is present anymore - // if input field is already materialized, change to timestamp type - val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - input.getRowType.getFieldList.map(_.getType)) + val materializer = createMaterializer(input) // update input expressions val patternDefs = matchRel.getPatternDefinitions.mapValues(_.accept(materializer)) @@ -180,23 +175,16 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(filter: LogicalFilter): RelNode = { // visit children and update inputs val input = filter.getInput.accept(this) + val materializer = createMaterializer(input) - // We do not materialize time indicators in conditions because they can be locally evaluated. - // Some conditions are evaluated by special operators (e.g., time window joins). - // Time indicators in remaining conditions are materialized by Calc before the code generation. - LogicalFilter.create(input, filter.getCondition) + val condition = filter.getCondition.accept(materializer) + LogicalFilter.create(input, condition) } override def visit(project: LogicalProject): RelNode = { // visit children and update inputs val input = project.getInput.accept(this) - - // check if input field contains time indicator type - // materialize field if no time indicator is present anymore - // if input field is already materialized, change to timestamp type - val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - input.getRowType.getFieldList.map(_.getType)) + val materializer = createMaterializer(input) val projects = project.getProjects.map(_.accept(materializer)) val fieldNames = project.getRowType.getFieldNames @@ -206,8 +194,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { override def visit(join: LogicalJoin): RelNode = { val left = join.getLeft.accept(this) val right = join.getRight.accept(this) + val materializer = createMaterializer(left, right) - LogicalJoin.create(left, right, join.getCondition, join.getVariablesSet, join.getJoinType) + LogicalJoin.create( + left, + right, + join.getCondition.accept(materializer), + join.getVariablesSet, + join.getJoinType) } def visit(temporalJoin: LogicalTemporalTableJoin): RelNode = { @@ -229,19 +223,11 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { case scan: LogicalTableFunctionScan => // visit children and update inputs val scanInputs = scan.getInputs.map(_.accept(this)) - - // check if input field contains time indicator type - // materialize field if no time indicator is present anymore - // if input field is already materialized, change to timestamp type - val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - inputs.head.getRowType.getFieldList.map(_.getType)) - - val call = scan.getCall.accept(materializer) + val materializer = createMaterializer(inputs.head) LogicalTableFunctionScan.create( scan.getCluster, scanInputs, - call, + scan.getCall.accept(materializer), scan.getElementType, scan.getRowType, scan.getColumnMappings) @@ -369,6 +355,15 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { indicesToMaterialize.toSet } + + private def createMaterializer(inputs: RelNode*): RexTimeIndicatorMaterializer = { + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type + new RexTimeIndicatorMaterializer( + rexBuilder, + inputs.flatMap(_.getRowType.getFieldList.map(_.getType))) + } } object RelTimeIndicatorConverter { @@ -412,11 +407,34 @@ object RelTimeIndicatorConverter { * @return The expression with materialized time indicators. */ def convertExpression(expr: RexNode, rowType: RelDataType, rexBuilder: RexBuilder): RexNode = { + // check if input field contains time indicator type + // materialize field if no time indicator is present anymore + // if input field is already materialized, change to timestamp type val materializer = new RexTimeIndicatorMaterializer( - rexBuilder, - rowType.getFieldList.map(_.getType)) + rexBuilder, + rowType.getFieldList.map(_.getType)) + + expr.accept(materializer) + } + + /** + * Checks if the given call is a materialization call for either proctime or rowtime. + */ + def isMaterializationCall(call: RexCall): Boolean = { + val isProctimeCall: Boolean = { + call.getOperator == ProctimeSqlFunction && + call.getOperands.size() == 1 && + isProctimeIndicatorType(call.getOperands.get(0).getType) + } + + val isRowtimeCall: Boolean = { + call.getOperator == SqlStdOperatorTable.CAST && + call.getOperands.size() == 1 && + isRowtimeIndicatorType(call.getOperands.get(0).getType) && + call.getType.getSqlTypeName == SqlTypeName.TIMESTAMP + } - expr.accept(materializer) + isProctimeCall || isRowtimeCall } } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala index 072acb3..f51c088 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamJoinRule.scala @@ -73,7 +73,7 @@ class DataStreamJoinRule val remainingPredsAccessTime = remainingPreds.isDefined && accessesTimeAttribute(remainingPreds.get, join.getRowType) - // Check that no event-time attributes are in the input because non-window join is unbounded + // Check that no event-time attributes are in the output because non-window join is unbounded // and we don't know how much to hold back watermarks. val rowTimeAttrInOutput = join.getRowType.getFieldList.asScala .exists(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType)) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala index 3dfae99..cd9c5a8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamWindowJoinRule.scala @@ -52,7 +52,7 @@ class DataStreamWindowJoinRule if (windowBounds.get.isEventTime) { true } else { - // Check that no event-time attributes are in the input because the processing time window + // Check that no event-time attributes are in the output because the processing time window // join does not correctly hold back watermarks. // We rely on projection pushdown to remove unused attributes before the join. !join.getRowType.getFieldList.asScala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala index 18e26df..3e355e8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/WindowJoinUtil.scala @@ -23,12 +23,14 @@ import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.JoinRelType import org.apache.calcite.rex._ -import org.apache.calcite.sql.SqlKind +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.sql.{SqlKind, SqlOperatorTable} import org.apache.flink.api.common.functions.FlatJoinFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.calcite.{FlinkTypeFactory, RelTimeIndicatorConverter} import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction} +import org.apache.flink.table.functions.sql.ProctimeSqlFunction import org.apache.flink.table.plan.schema.{RowSchema, TimeIndicatorRelDataType} import org.apache.flink.types.Row @@ -380,13 +382,13 @@ object WindowJoinUtil { */ def replaceTimeFieldWithLiteral(expr: RexNode): RexNode = { expr match { + case c: RexCall if RelTimeIndicatorConverter.isMaterializationCall(c) => + // replace with timestamp + rexBuilder.makeZeroLiteral(expr.getType) case c: RexCall => // replace in call operands val newOps = c.operands.asScala.map(replaceTimeFieldWithLiteral).asJava rexBuilder.makeCall(c.getType, c.getOperator, newOps) - case i: RexInputRef if FlinkTypeFactory.isTimeIndicatorType(i.getType) => - // replace with timestamp - rexBuilder.makeZeroLiteral(expr.getType) case _ => expr } } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala index 736f9a2..f435113 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/JoinTest.scala @@ -21,6 +21,7 @@ import org.apache.calcite.rel.logical.LogicalJoin import org.apache.flink.api.scala._ import org.apache.flink.table.api.Types import org.apache.flink.table.api.scala._ +import org.apache.flink.table.calcite.RelTimeIndicatorConverter import org.apache.flink.table.expressions.Null import org.apache.flink.table.plan.logical.TumblingGroupWindow import org.apache.flink.table.runtime.join.WindowJoinUtil @@ -29,6 +30,9 @@ import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} import org.junit.Assert._ import org.junit.Test +/** + * Tests for both windowed and non-windowed joins. + */ class JoinTest extends TableTestBase { private val streamUtil: StreamTableTestUtil = streamTestUtil() streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c.rowtime, 'proctime.proctime) @@ -62,8 +66,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "proctime") ), term("where", - "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + - "<=(proctime, +(proctime0, 3600000)))"), + "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " + + "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"), term("join", "a, proctime, a0, b, proctime0"), term("joinType", "InnerJoin") ), @@ -100,8 +104,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 10000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, c, a0, b, c0"), term("joinType", "InnerJoin") ), @@ -138,8 +142,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "proctime") ), term("where", - "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + - "<=(proctime, +(proctime0, 3600000)))"), + "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " + + "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"), term("join", "a, proctime, a0, b, proctime0"), term("joinType", "InnerJoin") ), @@ -176,8 +180,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 600000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, c, a0, b, c0"), term("joinType", "InnerJoin") ), @@ -208,7 +212,7 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "a", "b", "proctime") ), - term("where", "AND(=(a, a0), =(proctime, proctime0))"), + term("where", "AND(=(a, a0), =(PROCTIME(proctime), PROCTIME(proctime0)))"), term("join", "a", "proctime", "a0", "b", "proctime0"), term("joinType", "InnerJoin") ), @@ -238,7 +242,7 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "a", "b", "c") ), - term("where", "AND(=(a, a0), =(c, c0))"), + term("where", "AND(=(a, a0), =(CAST(c), CAST(c0)))"), term("join", "a", "c", "a0", "b", "c0"), term("joinType", "InnerJoin") ), @@ -280,8 +284,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "a", "c", "proctime", "12 AS nullField") ), - term("where", "AND(=(a, a0), =(nullField, nullField0), >=(proctime, " + - "-(proctime0, 5000)), <=(proctime, +(proctime0, 5000)))"), + term("where", "AND(=(a, a0), =(nullField, nullField0), >=(PROCTIME(proctime), " + + "-(PROCTIME(proctime0), 5000)), <=(PROCTIME(proctime), +(PROCTIME(proctime0), 5000)))"), term("join", "a", "c", "proctime", "nullField", "a0", "c0", "proctime0", "nullField0"), term("joinType", "InnerJoin") ), @@ -320,8 +324,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 600000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, b, c, a0, b0, c0"), term("joinType", "InnerJoin") ), @@ -365,8 +369,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 600000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 600000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, b, c, a0, b0, c0"), term("joinType", "InnerJoin") ), @@ -408,8 +412,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "proctime") ), term("where", - "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + - "<=(proctime, +(proctime0, 3600000)))"), + "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " + + "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"), term("join", "a, proctime, a0, b, proctime0"), term("joinType", "LeftOuterJoin") ), @@ -446,8 +450,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 10000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, c, a0, b, c0"), term("joinType", "LeftOuterJoin") ), @@ -485,8 +489,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "proctime") ), term("where", - "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + - "<=(proctime, +(proctime0, 3600000)))"), + "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " + + "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"), term("join", "a, proctime, a0, b, proctime0"), term("joinType", "RightOuterJoin") ), @@ -523,8 +527,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 10000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, c, a0, b, c0"), term("joinType", "RightOuterJoin") ), @@ -562,8 +566,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "proctime") ), term("where", - "AND(=(a, a0), >=(proctime, -(proctime0, 3600000)), " + - "<=(proctime, +(proctime0, 3600000)))"), + "AND(=(a, a0), >=(PROCTIME(proctime), -(PROCTIME(proctime0), 3600000)), " + + "<=(PROCTIME(proctime), +(PROCTIME(proctime0), 3600000)))"), term("join", "a, proctime, a0, b, proctime0"), term("joinType", "FullOuterJoin") ), @@ -600,8 +604,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 10000)), " + - "<=(c, +(c0, 3600000)))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)))"), term("join", "a, c, a0, b, c0"), term("joinType", "FullOuterJoin") ), @@ -640,8 +644,8 @@ class JoinTest extends TableTestBase { term("select", "a", "b", "c") ), term("where", - "AND(=(a, a0), >=(c, -(c0, 10000)), " + - "<=(c, +(c0, 3600000)), LIKE(b, b0))"), + "AND(=(a, a0), >=(CAST(c), -(CAST(c0), 10000)), " + + "<=(CAST(c), +(CAST(c0), 3600000)), LIKE(b, b0))"), term("join", "a, b, c, a0, b0, c0"), // Since we filter on attributes b and b0 after the join, the full outer join // will be automatically optimized to inner join. @@ -795,7 +799,9 @@ class JoinTest extends TableTestBase { "SELECT t1.a, t2.b FROM MyTable as t1 join MyTable2 as t2 on t1.a = t2.a and " + timeSql val resultTable = streamUtil.tableEnv.sqlQuery(query) - val relNode = resultTable.getRelNode + val relNode = RelTimeIndicatorConverter.convert( + resultTable.getRelNode, + streamUtil.tableEnv.getRelBuilder.getRexBuilder) val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] val (windowBounds, _) = WindowJoinUtil.extractWindowBoundsFromPredicate( @@ -1008,7 +1014,9 @@ class JoinTest extends TableTestBase { expectCondStr: String): Unit = { val resultTable = streamUtil.tableEnv.sqlQuery(query) - val relNode = resultTable.getRelNode + val relNode = RelTimeIndicatorConverter.convert( + resultTable.getRelNode, + streamUtil.tableEnv.getRelBuilder.getRexBuilder) val joinNode = relNode.getInput(0).asInstanceOf[LogicalJoin] val (_, remainCondition) = WindowJoinUtil.extractWindowBoundsFromPredicate( diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala index d7f5c71..138497c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/JoinTest.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.utils.TableTestUtil._ import org.junit.Test /** - * Currently only time-windowed joins can be processed in a streaming fashion. + * Tests for both windowed and non-windowed joins. */ class JoinTest extends TableTestBase { @@ -57,8 +57,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rrtime") ), - term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," + - " <(lrtime, +(rrtime, 3000)))"), + term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," + + " <(CAST(lrtime), +(CAST(rrtime), 3000)))"), term("join", "a", "lrtime", "d", "e", "rrtime"), term("joinType", "InnerJoin") ), @@ -92,7 +92,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rptime") ), - term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"), + term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " + + "<(PROCTIME(lptime), PROCTIME(rptime)))"), term("join", "a", "lptime", "d", "e", "rptime"), term("joinType", "InnerJoin") ), @@ -126,7 +127,7 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rptime") ), - term("where", "AND(=(a, d), =(lptime, rptime))"), + term("where", "AND(=(a, d), =(PROCTIME(lptime), PROCTIME(rptime)))"), term("join", "a", "lptime", "d", "e", "rptime"), term("joinType", "InnerJoin") ), @@ -153,7 +154,8 @@ class JoinTest extends TableTestBase { streamTableNode(0), streamTableNode(1), term("where", - "AND(=(a, d), >=(lrtime, -(rrtime, 300000)), <(lrtime, rrtime), >(lrtime, " + "f))"), + "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000)), " + + "<(CAST(lrtime), CAST(rrtime)), >(CAST(lrtime), f))"), term("join", "a", "b", "c", "lrtime", "d", "e", "f", "rrtime"), term("joinType", "InnerJoin") ) @@ -188,8 +190,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rrtime") ), - term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," + - " <(lrtime, +(rrtime, 3000)))"), + term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," + + " <(CAST(lrtime), +(CAST(rrtime), 3000)))"), term("join", "a", "lrtime", "d", "e", "rrtime"), term("joinType", "LeftOuterJoin") ), @@ -223,7 +225,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rptime") ), - term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"), + term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " + + "<(PROCTIME(lptime), PROCTIME(rptime)))"), term("join", "a", "lptime", "d", "e", "rptime"), term("joinType", "LeftOuterJoin") ), @@ -260,8 +263,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rrtime") ), - term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," + - " <(lrtime, +(rrtime, 3000)))"), + term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," + + " <(CAST(lrtime), +(CAST(rrtime), 3000)))"), term("join", "a", "lrtime", "d", "e", "rrtime"), term("joinType", "RightOuterJoin") ), @@ -295,7 +298,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rptime") ), - term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"), + term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " + + "<(PROCTIME(lptime), PROCTIME(rptime)))"), term("join", "a", "lptime", "d", "e", "rptime"), term("joinType", "RightOuterJoin") ), @@ -332,8 +336,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rrtime") ), - term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," + - " <(lrtime, +(rrtime, 3000)))"), + term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," + + " <(CAST(lrtime), +(CAST(rrtime), 3000)))"), term("join", "a", "lrtime", "d", "e", "rrtime"), term("joinType", "FullOuterJoin") ), @@ -367,7 +371,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rptime") ), - term("where", "AND(=(a, d), >=(lptime, -(rptime, 1000)), <(lptime, rptime))"), + term("where", "AND(=(a, d), >=(PROCTIME(lptime), -(PROCTIME(rptime), 1000)), " + + "<(PROCTIME(lptime), PROCTIME(rptime)))"), term("join", "a", "lptime", "d", "e", "rptime"), term("joinType", "FullOuterJoin") ), @@ -402,8 +407,8 @@ class JoinTest extends TableTestBase { streamTableNode(1), term("select", "d", "e", "rrtime") ), - term("where", "AND(=(a, d), >=(lrtime, -(rrtime, 300000))," + - " <(lrtime, +(rrtime, 3000)))"), + term("where", "AND(=(a, d), >=(CAST(lrtime), -(CAST(rrtime), 300000))," + + " <(CAST(lrtime), +(CAST(rrtime), 3000)))"), term("join", "a", "lrtime", "d", "e", "rrtime"), // Since we filter on attributes of the left table after the join, the left outer join // will be automatically optimized to inner join. diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala index 1706169..29dda21 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala @@ -85,7 +85,7 @@ class TimeIndicatorConversionTest extends TableTestBase { "DataStreamCalc", streamTableNode(0), term("select", "rowtime"), - term("where", ">(rowtime, 1990-12-02 12:11:11)") + term("where", ">(CAST(rowtime), 1990-12-02 12:11:11)") ) util.verifyTable(result, expected) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 304dbb3..1706fc8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.stream import java.lang.{Integer => JInt, Long => JLong} import java.math.BigDecimal +import java.sql.Timestamp import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo @@ -661,6 +662,51 @@ class TimeAttributesITCase extends AbstractTestBase { ) assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testMaterializedRowtimeFilter(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val data = new mutable.MutableList[(String, Timestamp, Int)] + data.+=(("ACME", new Timestamp(1000L), 12)) + data.+=(("ACME", new Timestamp(2000L), 17)) + data.+=(("ACME", new Timestamp(3000L), 13)) + data.+=(("ACME", new Timestamp(4000L), 11)) + + val t = env.fromCollection(data) + .assignAscendingTimestamps(e => e._2.toInstant.toEpochMilli) + .toTable(tEnv, 'symbol, 'tstamp.rowtime, 'price) + tEnv.registerTable("Ticker", t) + + val sqlQuery = + s""" + |SELECT * + |FROM ( + | SELECT symbol, SUM(price) as price, + | TUMBLE_ROWTIME(tstamp, interval '1' second) as rowTime, + | TUMBLE_START(tstamp, interval '1' second) as startTime, + | TUMBLE_END(tstamp, interval '1' second) as endTime + | FROM Ticker + | GROUP BY symbol, TUMBLE(tstamp, interval '1' second) + |) + |WHERE startTime < endTime + |""".stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List( + "ACME,12,1970-01-01 00:00:01.999,1970-01-01 00:00:01.0,1970-01-01 00:00:02.0", + "ACME,17,1970-01-01 00:00:02.999,1970-01-01 00:00:02.0,1970-01-01 00:00:03.0", + "ACME,13,1970-01-01 00:00:03.999,1970-01-01 00:00:03.0,1970-01-01 00:00:04.0", + "ACME,11,1970-01-01 00:00:04.999,1970-01-01 00:00:04.0,1970-01-01 00:00:05.0") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } object TimeAttributesITCase {