[ 
https://issues.apache.org/jira/browse/FLINK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nic Townsend updated FLINK-38162:
---------------------------------
    Description: 
In the Flink docs for window join - 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
 - it uses {{COALESCE}} in a FULL JOIN to ensure the row has a value for 
{{window_start}} and {{window_end}}.

However, the example omits {{window_time}} from the result - which is needed 
for downstream re-windowing, or cascading windows 
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]

An example use case might be "I have a two phased commit process - and I want 
to know how many transactions in a window were just LEFT, how many were RIGHT, 
and how many were COMPLETE". It possibly is very niche - the problem is that 
adding in {{window_time}} causes a series of different errors to occur with 
downstream operators.
h4. Problem 1:

If you use {{COALESCE}} for {{window_time}}, a {{DESCRIBE}} will show the 
column as being {{ROWTIME}}:
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                     
   watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true |     |        | | {code}
But - if you use a Windowing TVF on the results of the join, Flink throws an 
error that {{window_time}} is not a time attribute.

This is (I assume) due to the use of {{COALESCE}}: 
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
 says:

> As long as a time attribute is not modified, and is simply forwarded from one 
> part of a query to another, it remains a valid time attribute. Time 
> attributes behave like regular timestamps, and are accessible for 
> calculations. When used in calculations, time attributes are materialized and 
> act as standard timestamps. However, ordinary timestamps cannot be used in 
> place of, or be converted to, time attributes.

This is reflected in the plan - where I can see that the inputs to {{COALESCE}} 
are {{CAST}} first:
{{COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 
CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))}}

h4. Problem 2:

If you try to use cascading windows and perform window aggregation after the 
join, you do not get a windowed aggregate. Instead, the planner will create an 
unbounded aggregate. As with problem 1, the assumption based off 
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
 is that the time attribute has not propagated, so the window does not cascade.

The Physical plan shows that it's created a {{GroupAggregate}} after the 
{{Calc}} from the {{WindowJoin}}. Also, the {{Calc}} is performing a {{CAST}} 
on {{window_time}} into {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}}.

{{code}}
+- Calc(select=[CAST(agg_window_start AS TIMESTAMP(9)) AS window_start, 
CAST(agg_window_end AS TIMESTAMP(9)) AS window_end, CAST(agg_window_time AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS 
BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS 
r_version_count], changelogMode=[I,UA])
   +- GroupAggregate(groupBy=[agg_window_start, agg_window_end, 
agg_window_time], select=[agg_window_start, agg_window_end, agg_window_time, 
COUNT(l_version) AS l_version_count, COUNT(r_version) AS r_version_count], 
changelogMode=[I,UA])
      +- Exchange(distribution=[hash[agg_window_start, agg_window_end, 
agg_window_time]], changelogMode=[I])
         +- Calc(select=[COALESCE(window_start, window_start0) AS 
agg_window_start, COALESCE(window_end, window_end0) AS agg_window_end, 
COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 
CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))) AS agg_window_time, 
version AS l_version, version0 AS r_version], changelogMode=[I])
            +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[5 s])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 
s])], joinType=[FullOuterJoin], where=[AND(=(version, version0), =(window_time, 
window_time0))], select=[version, window_start, window_end, window_time, 
version0, window_start0, window_end0, window_time0], changelogMode=[I])
{{code}}

If I change to a LEFT JOIN then the Physical plan changes - the {{Calc}} after 
the {{WindowJoin}} no longer has a {{COALESCE}} or {{CAST}}, and then you get a 
{{LocalWindowAggregate}} generated from the {{Calc}}:

{{code}}
+- Calc(select=[CAST(window_start AS TIMESTAMP(9)) AS window_start, 
CAST(window_end AS TIMESTAMP(9)) AS window_end, CAST(CAST(window_time AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL) AS 
TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS 
BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS 
r_version_count], changelogMode=[I])
   +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[5 s])], 
select=[COUNT(count$0) AS l_version_count, COUNT(count$1) AS r_version_count, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time], changelogMode=[I])
      +- Exchange(distribution=[single], changelogMode=[I])
         +- LocalWindowAggregate(window=[TUMBLE(win_start=[agg_window_start], 
win_end=[agg_window_end], size=[5 s])], select=[COUNT(l_version) AS count$0, 
COUNT(r_version) AS count$1, slice_end('w$) AS $window_end], changelogMode=[I])
            +- Calc(select=[window_start AS agg_window_start, window_end AS 
agg_window_end, window_time AS agg_window_time, version AS l_version, version0 
AS r_version], changelogMode=[I])
               +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[5 s])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 
s])], joinType=[LeftOuterJoin], where=[AND(=(version, version0), =(window_time, 
window_time0))], select=[version, window_start, window_end, window_time, 
version0, window_start0, window_end0, window_time0], changelogMode=[I])
{{code}}

h4. Problem 3:

If you try to use cascading window aggregation, but also include a statement 
set to {{insert}} the results of the join and the aggregate into separate 
sinks, then you get a Calcite error:
{code:java}
java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args 
[rel#1277:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])]
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
        at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
        at 
org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103)
        at org.apache.flink.table.api.Explainable.explain(Explainable.java:40)
        at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156)
        at com.example.SQLSubmitter.main(SQLSubmitter.java:134)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
ProjectToCalcRule
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
        at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269)
        at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284)
        at 
org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
        ... 36 more
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) 
agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" r_version) NOT NULL
equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) 
agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" r_version) NOT NULL
Difference:
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) *ROWTIME*

        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
        ... 40 more
{code}

h4. Expected behaviour:

A window join is created by an equijoin of {{window_start}} and {{window_end}} 
- which doesn't by definition include {{window_time}}. 

However, the equijoin means that both sides of the join will have the same 
value for {{window_end}}. Additionally, the window TVF will not return null. As 
such, when {{window_time}} is included in the output of the join via 
{{COALESCE}}, the value can never be null - so there is also no reason to 
remove the time attribute.

  was:
In the Flink docs for window join - 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
 - it uses {{COALESCE}} in a FULL JOIN to ensure the row has a value for 
{{window_start}} and {{window_end}}.

However, the example omits {{window_time}} from the result - which is needed 
for downstream re-windowing, or cascading windows 
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]

An example use case might be "I have a two phased commit process - and I want 
to know how many transactions in a window were just LEFT, how many were RIGHT, 
and how many were COMPLETE". It possibly is very niche - the problem is that 
adding in {{window_time}} causes a series of different errors to occur with 
downstream operators.
h4. Problem 1:

If you use {{COALESCE}} for {{window_time}}, a {{DESCRIBE}} will show the 
column as being {{ROWTIME}}:
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
|     name |                   type | null | key | extras |                     
   watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true |     |        | | {code}
But - if you use a Windowing TVF on the results of the join, Flink throws an 
error that {{window_time}} is not a time attribute.

This is (I assume) due to the use of {{COALESCE}}: 
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
 says:

> As long as a time attribute is not modified, and is simply forwarded from one 
> part of a query to another, it remains a valid time attribute. Time 
> attributes behave like regular timestamps, and are accessible for 
> calculations. When used in calculations, time attributes are materialized and 
> act as standard timestamps. However, ordinary timestamps cannot be used in 
> place of, or be converted to, time attributes.

This is reflected in the plan - where I can see that the inputs to {{COALESCE}} 
are {{CAST}} first:
{{COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 
CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))}}

h4. Problem 2:

If you try to use cascading windows and perform window aggregation after the 
join, you do not get a windowed aggregate. Instead, the planner will create an 
unbounded aggregate. As with problem 1, the assumption based off 
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
 is that the time attribute has not propagated, so the window does not cascade.
h4. Problem 3:

If you try to use cascading window aggregation, but also include a statement 
set to {{insert}} the results of the join and the aggregate into separate 
sinks, then you get a Calcite error:
{code:java}
java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args 
[rel#1277:LogicalProject.NONE.any.None: 
0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])]
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
        at 
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at 
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
        at 
org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103)
        at org.apache.flink.table.api.Explainable.explain(Explainable.java:40)
        at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156)
        at com.example.SQLSubmitter.main(SQLSubmitter.java:134)
Caused by: java.lang.RuntimeException: Error occurred while applying rule 
ProjectToCalcRule
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
        at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269)
        at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284)
        at 
org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
        ... 36 more
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) 
agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" r_version) NOT NULL
equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) 
agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE" r_version) NOT NULL
Difference:
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) *ROWTIME*

        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
        ... 40 more
{code}

h4. Additional observations:

If I change the join to an INNER or LEFT join - the problem disappears. 
Primarily, this is because the planner removes the {{COALESCE}} - which I 
assume is because it can guarantee that the left side will always have a value, 
so there's no need to {{COALESCE}} with the right side.

h4. Expected behaviour:

A window join is created by an equijoin of {{window_start}} and {{window_end}} 
- which doesn't by definition include {{window_time}}. 

However, the equijoin means that both sides of the join will have the same 
value for {{window_end}}. Additionally, the window TVF will not return null. As 
such, when {{window_time}} is included in the output of the join via 
{{COALESCE}}, the value can never be null - so there is also no reason to 
remove the time attribute.


> Use of SQL functions with time attributes causes downstream temporal 
> functions to fail
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-38162
>                 URL: https://issues.apache.org/jira/browse/FLINK-38162
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.20.0, 2.1.0
>            Reporter: Nic Townsend
>            Priority: Minor
>         Attachments: SQLSubmitter.java
>
>
> In the Flink docs for window join - 
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
>  - it uses {{COALESCE}} in a FULL JOIN to ensure the row has a value for 
> {{window_start}} and {{window_end}}.
> However, the example omits {{window_time}} from the result - which is needed 
> for downstream re-windowing, or cascading windows 
> ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]
> An example use case might be "I have a two phased commit process - and I want 
> to know how many transactions in a window were just LEFT, how many were 
> RIGHT, and how many were COMPLETE". It possibly is very niche - the problem 
> is that adding in {{window_time}} causes a series of different errors to 
> occur with downstream operators.
> h4. Problem 1:
> If you use {{COALESCE}} for {{window_time}}, a {{DESCRIBE}} will show the 
> column as being {{ROWTIME}}:
> {code:java}
> +----------+------------------------+------+-----+--------+----------------------------------+
> |     name |                   type | null | key | extras |                   
>      watermark |
> +----------+------------------------+------+-----+--------+----------------------------------+
> | window_time | TIMESTAMP(3) *ROWTIME* | true |     |        | | {code}
> But - if you use a Windowing TVF on the results of the join, Flink throws an 
> error that {{window_time}} is not a time attribute.
> This is (I assume) due to the use of {{COALESCE}}: 
> [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
>  says:
> > As long as a time attribute is not modified, and is simply forwarded from 
> > one part of a query to another, it remains a valid time attribute. Time 
> > attributes behave like regular timestamps, and are accessible for 
> > calculations. When used in calculations, time attributes are materialized 
> > and act as standard timestamps. However, ordinary timestamps cannot be used 
> > in place of, or be converted to, time attributes.
> This is reflected in the plan - where I can see that the inputs to 
> {{COALESCE}} are {{CAST}} first:
> {{COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 
> CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)))}}
> h4. Problem 2:
> If you try to use cascading windows and perform window aggregation after the 
> join, you do not get a windowed aggregate. Instead, the planner will create 
> an unbounded aggregate. As with problem 1, the assumption based off 
> ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
>  is that the time attribute has not propagated, so the window does not 
> cascade.
> The Physical plan shows that it's created a {{GroupAggregate}} after the 
> {{Calc}} from the {{WindowJoin}}. Also, the {{Calc}} is performing a {{CAST}} 
> on {{window_time}} into {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}}.
> {{code}}
> +- Calc(select=[CAST(agg_window_start AS TIMESTAMP(9)) AS window_start, 
> CAST(agg_window_end AS TIMESTAMP(9)) AS window_end, CAST(agg_window_time AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS 
> BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS 
> r_version_count], changelogMode=[I,UA])
>    +- GroupAggregate(groupBy=[agg_window_start, agg_window_end, 
> agg_window_time], select=[agg_window_start, agg_window_end, agg_window_time, 
> COUNT(l_version) AS l_version_count, COUNT(r_version) AS r_version_count], 
> changelogMode=[I,UA])
>       +- Exchange(distribution=[hash[agg_window_start, agg_window_end, 
> agg_window_time]], changelogMode=[I])
>          +- Calc(select=[COALESCE(window_start, window_start0) AS 
> agg_window_start, COALESCE(window_end, window_end0) AS agg_window_end, 
> COALESCE(CAST(window_time AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), 
> CAST(window_time0 AS TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))) AS agg_window_time, 
> version AS l_version, version0 AS r_version], changelogMode=[I])
>             +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[5 s])], 
> rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 
> s])], joinType=[FullOuterJoin], where=[AND(=(version, version0), 
> =(window_time, window_time0))], select=[version, window_start, window_end, 
> window_time, version0, window_start0, window_end0, window_time0], 
> changelogMode=[I])
> {{code}}
> If I change to a LEFT JOIN then the Physical plan changes - the {{Calc}} 
> after the {{WindowJoin}} no longer has a {{COALESCE}} or {{CAST}}, and then 
> you get a {{LocalWindowAggregate}} generated from the {{Calc}}:
> {{code}}
> +- Calc(select=[CAST(window_start AS TIMESTAMP(9)) AS window_start, 
> CAST(window_end AS TIMESTAMP(9)) AS window_end, CAST(CAST(window_time AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL) AS 
> TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) AS window_time, CAST(l_version_count AS 
> BIGINT) AS l_version_count, CAST(r_version_count AS BIGINT) AS 
> r_version_count], changelogMode=[I])
>    +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[5 
> s])], select=[COUNT(count$0) AS l_version_count, COUNT(count$1) AS 
> r_version_count, start('w$) AS window_start, end('w$) AS window_end, 
> rowtime('w$) AS window_time], changelogMode=[I])
>       +- Exchange(distribution=[single], changelogMode=[I])
>          +- LocalWindowAggregate(window=[TUMBLE(win_start=[agg_window_start], 
> win_end=[agg_window_end], size=[5 s])], select=[COUNT(l_version) AS count$0, 
> COUNT(r_version) AS count$1, slice_end('w$) AS $window_end], 
> changelogMode=[I])
>             +- Calc(select=[window_start AS agg_window_start, window_end AS 
> agg_window_end, window_time AS agg_window_time, version AS l_version, 
> version0 AS r_version], changelogMode=[I])
>                +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[5 s])], 
> rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[5 
> s])], joinType=[LeftOuterJoin], where=[AND(=(version, version0), 
> =(window_time, window_time0))], select=[version, window_start, window_end, 
> window_time, version0, window_start0, window_end0, window_time0], 
> changelogMode=[I])
> {{code}}
> h4. Problem 3:
> If you try to use cascading window aggregation, but also include a statement 
> set to {{insert}} the results of the join and the aggregate into separate 
> sinks, then you get a Calcite error:
> {code:java}
> java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args 
> [rel#1277:LogicalProject.NONE.any.None: 
> 0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])]
>         at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
>         at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
>         at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
>         at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>         at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>         at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89)
>         at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
>         at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89)
>         at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
>         at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
>         at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534)
>         at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>         at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
>         at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
>         at 
> org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103)
>         at org.apache.flink.table.api.Explainable.explain(Explainable.java:40)
>         at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156)
>         at com.example.SQLSubmitter.main(SQLSubmitter.java:134)
> Caused by: java.lang.RuntimeException: Error occurred while applying rule 
> ProjectToCalcRule
>         at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
>         at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269)
>         at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284)
>         at 
> org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72)
>         at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
>         ... 36 more
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) 
> agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" r_version) NOT NULL
> equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) 
> agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) 
> CHARACTER SET "UTF-16LE" r_version) NOT NULL
> Difference:
> agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) 
> *ROWTIME*
>         at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
>         at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
>         at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
>         ... 40 more
> {code}
> h4. Expected behaviour:
> A window join is created by an equijoin of {{window_start}} and 
> {{window_end}} - which doesn't by definition include {{window_time}}. 
> However, the equijoin means that both sides of the join will have the same 
> value for {{window_end}}. Additionally, the window TVF will not return null. 
> As such, when {{window_time}} is included in the output of the join via 
> {{COALESCE}}, the value can never be null - so there is also no reason to 
> remove the time attribute.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to