[ https://issues.apache.org/jira/browse/FLINK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nic Townsend updated FLINK-38162: --------------------------------- Description: In the Flink docs for window join - [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer] - it uses {{COALESCE}} in a FULL JOIN to ensure the row has a value for {{window_start}} and {{window_end}}. However, the example omits {{window_time}} from the result - which is needed for downstream re-windowing, or cascading windows ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).] An example use case might be "I have a two phased commit process - and I want to know how many transactions in a window were just LEFT, how many were RIGHT, and how many were COMPLETE". It possibly is very niche - the problem is that adding in {{window_time}} causes a series of different errors to occur with downstream operators. h4. Problem 1: If you use {{COALESCE}} for {{window_time}}, a {{DESCRIBE}} will show the column as being {{ROWTIME}}: {code:java} +----------+------------------------+------+-----+--------+----------------------------------+ | name | type | null | key | extras | watermark | +----------+------------------------+------+-----+--------+----------------------------------+ | window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code} But - if you use a Windowing TVF on the results of the join, Flink throws an error that {{window_time}} is not a time attribute. This is (I assume) due to the use of {{COALESCE}}: [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes] says: > As long as a time attribute is not modified, and is simply forwarded from one > part of a query to another, it remains a valid time attribute. Time > attributes behave like regular timestamps, and are accessible for > calculations. When used in calculations, time attributes are materialized and > act as standard timestamps. However, ordinary timestamps cannot be used in > place of, or be converted to, time attributes. This is reflected in the plan - where I can see that the inputs to {{COALESCE}} are {{CAST}} first: {{COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))}} h4. Problem 2: If you try to use cascading windows and perform window aggregation after the join, you do not get a windowed aggregate. Instead, the planner will create an unbounded aggregate. As with problem 1, the assumption based off ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)] is that the time attribute has not propagated, so the window does not cascade. The Physical plan shows that it's created a {{GroupAggregate}} after the {{Calc}} from the {{WindowJoin}}. Also, the {{Calc}} is performing a {{CAST}} on {{window_time}} into {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}}. {{code}} +- Calc(select=[CAST(agg_window_start AS TIMESTAMP(9)) AS window_start, CAST(agg_window_end AS TIMESTAMP(9)) AS window_end, CAST(agg_window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS r_version_count], changelogMode=[I,UA]) +- GroupAggregate(groupBy=[agg_window_start, agg_window_end, agg_window_time], select=[agg_window_start, agg_window_end, agg_window_time, COUNT(l_version) AS l_version_count, COUNT(r_version) AS r_version_count], changelogMode=[I,UA]) +- Exchange(distribution=[hash[agg_window_start, agg_window_end, agg_window_time]], changelogMode=[I]) +- Calc(select=[COALESCE(window_start, window_start0) AS agg_window_start, COALESCE(window_end, window_end0) AS agg_window_end, COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))) AS agg_window_time, version AS l_version, version0 AS r_version], changelogMode=[I]) +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 s])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 s])], joinType=[FullOuterJoin], where=[AND(=(version, version0), =(window_time, window_time0))], select=[version, window_start, window_end, window_time, version0, window_start0, window_end0, window_time0], changelogMode=[I]) {{code}} If I change to a LEFT JOIN then the Physical plan changes - the {{Calc}} after the {{WindowJoin}} no longer has a {{COALESCE}} or {{CAST}}, and then you get a {{LocalWindowAggregate}} generated from the {{Calc}}: {{code}} +- Calc(select=[CAST(window_start AS TIMESTAMP(9)) AS window_start, CAST(window_end AS TIMESTAMP(9)) AS window_end, CAST(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL) AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS r_version_count], changelogMode=[I]) +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[5 s])], select=[COUNT(count$0) AS l_version_count, COUNT(count$1) AS r_version_count, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time], changelogMode=[I]) +- Exchange(distribution=[single], changelogMode=[I]) +- LocalWindowAggregate(window=[TUMBLE(win_start=[agg_window_start], win_end=[agg_window_end], size=[5 s])], select=[COUNT(l_version) AS count$0, COUNT(r_version) AS count$1, slice_end('w$) AS $window_end], changelogMode=[I]) +- Calc(select=[window_start AS agg_window_start, window_end AS agg_window_end, window_time AS agg_window_time, version AS l_version, version0 AS r_version], changelogMode=[I]) +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 s])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 s])], joinType=[LeftOuterJoin], where=[AND(=(version, version0), =(window_time, window_time0))], select=[version, window_start, window_end, window_time, version0, window_start0, window_end0, window_time0], changelogMode=[I]) {{code}} h4. Problem 3: If you try to use cascading window aggregation, but also include a statement set to {{insert}} the results of the join and the aggregate into separate sinks, then you get a Calcite error: {code:java} java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args [rel#1277:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320) at org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697) at org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103) at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156) at com.example.SQLSubmitter.main(SQLSubmitter.java:134) Caused by: java.lang.RuntimeException: Error occurred while applying rule ProjectToCalcRule at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284) at org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223) ... 36 more Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference: agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) *ROWTIME* at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) ... 40 more {code} h4. Expected behaviour: A window join is created by an equijoin of {{window_start}} and {{window_end}} - which doesn't by definition include {{window_time}}. However, the equijoin means that both sides of the join will have the same value for {{window_end}}. Additionally, the window TVF will not return null. As such, when {{window_time}} is included in the output of the join via {{COALESCE}}, the value can never be null - so there is also no reason to remove the time attribute. was: In the Flink docs for window join - [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer] - it uses {{COALESCE}} in a FULL JOIN to ensure the row has a value for {{window_start}} and {{window_end}}. However, the example omits {{window_time}} from the result - which is needed for downstream re-windowing, or cascading windows ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).] An example use case might be "I have a two phased commit process - and I want to know how many transactions in a window were just LEFT, how many were RIGHT, and how many were COMPLETE". It possibly is very niche - the problem is that adding in {{window_time}} causes a series of different errors to occur with downstream operators. h4. Problem 1: If you use {{COALESCE}} for {{window_time}}, a {{DESCRIBE}} will show the column as being {{ROWTIME}}: {code:java} +----------+------------------------+------+-----+--------+----------------------------------+ | name | type | null | key | extras | watermark | +----------+------------------------+------+-----+--------+----------------------------------+ | window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code} But - if you use a Windowing TVF on the results of the join, Flink throws an error that {{window_time}} is not a time attribute. This is (I assume) due to the use of {{COALESCE}}: [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes] says: > As long as a time attribute is not modified, and is simply forwarded from one > part of a query to another, it remains a valid time attribute. Time > attributes behave like regular timestamps, and are accessible for > calculations. When used in calculations, time attributes are materialized and > act as standard timestamps. However, ordinary timestamps cannot be used in > place of, or be converted to, time attributes. This is reflected in the plan - where I can see that the inputs to {{COALESCE}} are {{CAST}} first: {{COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))}} h4. Problem 2: If you try to use cascading windows and perform window aggregation after the join, you do not get a windowed aggregate. Instead, the planner will create an unbounded aggregate. As with problem 1, the assumption based off ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)] is that the time attribute has not propagated, so the window does not cascade. h4. Problem 3: If you try to use cascading window aggregation, but also include a statement set to {{insert}} the results of the join and the aggregate into separate sinks, then you get a Calcite error: {code:java} java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args [rel#1277:LogicalProject.NONE.any.None: 0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])] at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250) at org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320) at org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697) at org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103) at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156) at com.example.SQLSubmitter.main(SQLSubmitter.java:134) Caused by: java.lang.RuntimeException: Error occurred while applying rule ProjectToCalcRule at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269) at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284) at org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223) ... 36 more Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference: agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) *ROWTIME* at org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) at org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) at org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) ... 40 more {code} h4. Additional observations: If I change the join to an INNER or LEFT join - the problem disappears. Primarily, this is because the planner removes the {{COALESCE}} - which I assume is because it can guarantee that the left side will always have a value, so there's no need to {{COALESCE}} with the right side. h4. Expected behaviour: A window join is created by an equijoin of {{window_start}} and {{window_end}} - which doesn't by definition include {{window_time}}. However, the equijoin means that both sides of the join will have the same value for {{window_end}}. Additionally, the window TVF will not return null. As such, when {{window_time}} is included in the output of the join via {{COALESCE}}, the value can never be null - so there is also no reason to remove the time attribute. > Use of SQL functions with time attributes causes downstream temporal > functions to fail > -------------------------------------------------------------------------------------- > > Key: FLINK-38162 > URL: https://issues.apache.org/jira/browse/FLINK-38162 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.20.0, 2.1.0 > Reporter: Nic Townsend > Priority: Minor > Attachments: SQLSubmitter.java > > > In the Flink docs for window join - > [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer] > - it uses {{COALESCE}} in a FULL JOIN to ensure the row has a value for > {{window_start}} and {{window_end}}. > However, the example omits {{window_time}} from the result - which is needed > for downstream re-windowing, or cascading windows > ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).] > An example use case might be "I have a two phased commit process - and I want > to know how many transactions in a window were just LEFT, how many were > RIGHT, and how many were COMPLETE". It possibly is very niche - the problem > is that adding in {{window_time}} causes a series of different errors to > occur with downstream operators. > h4. Problem 1: > If you use {{COALESCE}} for {{window_time}}, a {{DESCRIBE}} will show the > column as being {{ROWTIME}}: > {code:java} > +----------+------------------------+------+-----+--------+----------------------------------+ > | name | type | null | key | extras | > watermark | > +----------+------------------------+------+-----+--------+----------------------------------+ > | window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code} > But - if you use a Windowing TVF on the results of the join, Flink throws an > error that {{window_time}} is not a time attribute. > This is (I assume) due to the use of {{COALESCE}}: > [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes] > says: > > As long as a time attribute is not modified, and is simply forwarded from > > one part of a query to another, it remains a valid time attribute. Time > > attributes behave like regular timestamps, and are accessible for > > calculations. When used in calculations, time attributes are materialized > > and act as standard timestamps. However, ordinary timestamps cannot be used > > in place of, or be converted to, time attributes. > This is reflected in the plan - where I can see that the inputs to > {{COALESCE}} are {{CAST}} first: > {{COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), > CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))}} > h4. Problem 2: > If you try to use cascading windows and perform window aggregation after the > join, you do not get a windowed aggregate. Instead, the planner will create > an unbounded aggregate. As with problem 1, the assumption based off > ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)] > is that the time attribute has not propagated, so the window does not > cascade. > The Physical plan shows that it's created a {{GroupAggregate}} after the > {{Calc}} from the {{WindowJoin}}. Also, the {{Calc}} is performing a {{CAST}} > on {{window_time}} into {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}}. > {{code}} > +- Calc(select=[CAST(agg_window_start AS TIMESTAMP(9)) AS window_start, > CAST(agg_window_end AS TIMESTAMP(9)) AS window_end, CAST(agg_window_time AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS > BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS > r_version_count], changelogMode=[I,UA]) > +- GroupAggregate(groupBy=[agg_window_start, agg_window_end, > agg_window_time], select=[agg_window_start, agg_window_end, agg_window_time, > COUNT(l_version) AS l_version_count, COUNT(r_version) AS r_version_count], > changelogMode=[I,UA]) > +- Exchange(distribution=[hash[agg_window_start, agg_window_end, > agg_window_time]], changelogMode=[I]) > +- Calc(select=[COALESCE(window_start, window_start0) AS > agg_window_start, COALESCE(window_end, window_end0) AS agg_window_end, > COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), > CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))) AS agg_window_time, > version AS l_version, version0 AS r_version], changelogMode=[I]) > +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[5 s])], > rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 > s])], joinType=[FullOuterJoin], where=[AND(=(version, version0), > =(window_time, window_time0))], select=[version, window_start, window_end, > window_time, version0, window_start0, window_end0, window_time0], > changelogMode=[I]) > {{code}} > If I change to a LEFT JOIN then the Physical plan changes - the {{Calc}} > after the {{WindowJoin}} no longer has a {{COALESCE}} or {{CAST}}, and then > you get a {{LocalWindowAggregate}} generated from the {{Calc}}: > {{code}} > +- Calc(select=[CAST(window_start AS TIMESTAMP(9)) AS window_start, > CAST(window_end AS TIMESTAMP(9)) AS window_end, CAST(CAST(window_time AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL) AS > TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS > BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS > r_version_count], changelogMode=[I]) > +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[5 > s])], select=[COUNT(count$0) AS l_version_count, COUNT(count$1) AS > r_version_count, start('w$) AS window_start, end('w$) AS window_end, > rowtime('w$) AS window_time], changelogMode=[I]) > +- Exchange(distribution=[single], changelogMode=[I]) > +- LocalWindowAggregate(window=[TUMBLE(win_start=[agg_window_start], > win_end=[agg_window_end], size=[5 s])], select=[COUNT(l_version) AS count$0, > COUNT(r_version) AS count$1, slice_end('w$) AS $window_end], > changelogMode=[I]) > +- Calc(select=[window_start AS agg_window_start, window_end AS > agg_window_end, window_time AS agg_window_time, version AS l_version, > version0 AS r_version], changelogMode=[I]) > +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], > win_end=[window_end], size=[5 s])], > rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 > s])], joinType=[LeftOuterJoin], where=[AND(=(version, version0), > =(window_time, window_time0))], select=[version, window_start, window_end, > window_time, version0, window_start0, window_end0, window_time0], > changelogMode=[I]) > {{code}} > h4. Problem 3: > If you try to use cascading window aggregation, but also include a statement > set to {{insert}} the results of the join and the aggregate into separate > sinks, then you get a Calcite error: > {code:java} > java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args > [rel#1277:LogicalProject.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])] > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89) > at > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58) > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320) > at > org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697) > at > org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103) > at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) > at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156) > at com.example.SQLSubmitter.main(SQLSubmitter.java:134) > Caused by: java.lang.RuntimeException: Error occurred while applying rule > ProjectToCalcRule > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269) > at > org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284) > at > org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223) > ... 36 more > Caused by: java.lang.IllegalArgumentException: Type mismatch: > rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) > agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" r_version) NOT NULL > equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) > agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) > CHARACTER SET "UTF-16LE" r_version) NOT NULL > Difference: > agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) > *ROWTIME* > at > org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144) > ... 40 more > {code} > h4. Expected behaviour: > A window join is created by an equijoin of {{window_start}} and > {{window_end}} - which doesn't by definition include {{window_time}}. > However, the equijoin means that both sides of the join will have the same > value for {{window_end}}. Additionally, the window TVF will not return null. > As such, when {{window_time}} is included in the output of the join via > {{COALESCE}}, the value can never be null - so there is also no reason to > remove the time attribute. -- This message was sent by Atlassian Jira (v8.20.10#820010)