[
https://issues.apache.org/jira/browse/FLINK-38162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nic Townsend updated FLINK-38162:
---------------------------------
Description:
In the Flink docs for window join -
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
- it uses \{COALESCE} in a FULL JOIN to ensure the row has a value for
\{window_start} and \{window_end}.
However, the example omits \{window_time} from the result - which is needed for
downstream re-windowing, or cascading windows
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]
An example use case might be "I have a two phased commit process - and I want
to know how many transactions in a window were just LEFT, how many were RIGHT,
and how many were COMPLETE". It possibly is very niche - the problem is that
adding in \{window_time} causes a series of different errors to occur with
downstream operators.
h4. Problem 1:
If you use {{COALESCE}} for \{window_time}, a {{DESCRIBE}} will show the column
as being \{ROWTIME}:
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
| name | type | null | key | extras |
watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code}
But - if you use a Windowing TVF on the results of the join, Flink throws an
error that {{window_time}} is not a time attribute.
This is (I assume) due to the use of \{COALESCE} :
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
says:
> As long as a time attribute is not modified, and is simply forwarded from one
> part of a query to another, it remains a valid time attribute. Time
> attributes behave like regular timestamps, and are accessible for
> calculations. When used in calculations, time attributes are materialized and
> act as standard timestamps. However, ordinary timestamps cannot be used in
> place of, or be converted to, time attributes.
h4. Problem 2:
If you try to use cascading windows and perform window aggregation after the
join, you do not get a windowed aggregate. Instead, the planner will create an
unbounded aggregate. As with problem 1, the assumption based off
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
is that the time attribute has not propagated, so the window does not cascade.
h4. Problem 3:
If you try to use cascading window aggregation, but also include a statement
set to \{insert} the results of the join and the aggregate into separate sinks,
then you get a Calcite error:
{code:java}
java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args
[rel#1277:LogicalProject.NONE.any.None:
0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
at
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
at
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at
scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
at
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534)
at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
at
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
at
org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103)
at org.apache.flink.table.api.Explainable.explain(Explainable.java:40)
at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156)
at com.example.SQLSubmitter.main(SQLSubmitter.java:134)
Caused by: java.lang.RuntimeException: Error occurred while applying rule
ProjectToCalcRule
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269)
at
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284)
at
org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
... 36 more
Caused by: java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3)
agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" r_version) NOT NULL
equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3)
agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" r_version) NOT NULL
Difference:
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) *ROWTIME*
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
... 40 more
{code}
was:
In the Flink docs for window join -
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
- it uses \{COALESCE} in a FULL JOIN to ensure the row has a value for
\{window_start} and \{window_end}.
However, the example omits \{window_time} from the result - which is needed for
downstream re-windowing, or cascading windows
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]
An example use case might be "I have a two phased commit process - and I want
to know how many transactions in a window were just LEFT, how many were RIGHT,
and how many were COMPLETE". It possibly is very niche - the problem is that
adding in \{window_time} causes a series of different errors to occur with
downstream operators.
h4. Problem 1:
If you use {{COALESCE}} for \{window_time}, a {{DESCRIBE}} will show the column
as being \{ROWTIME}:
{code:java}
+----------+------------------------+------+-----+--------+----------------------------------+
| name | type | null | key | extras |
watermark |
+----------+------------------------+------+-----+--------+----------------------------------+
| window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code}
But - if you use a Windowing TVF on the results of the join, Flink throws an
error that {{window_time}} is not a time attribute.
This is (I assume) due to the use of \{COALESCE} :
[https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
says:
> As long as a time attribute is not modified, and is simply forwarded from one
> part of a query to another, it remains a valid time attribute. Time
> attributes behave like regular timestamps, and are accessible for
> calculations. When used in calculations, time attributes are materialized and
> act as standard timestamps. However, ordinary timestamps cannot be used in
> place of, or be converted to, time attributes.
h4. Problem 2:
If you try to use cascading windows and perform window aggregation after the
join, you do not get a windowed aggregate. Instead, the planner will create an
unbounded aggregate. As with problem 1, the assumption based off
([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
is that the time attribute has not propagated, so the window does not cascade.
Problem 3:
{code:java}
Caused by: java.lang.IllegalArgumentException: Type mismatch: rel rowtype:
RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3) agg_window_end,
TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time, VARCHAR(2147483647)
CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647) CHARACTER SET
"UTF-16LE" r_version) NOT NULL equiv rowtype: RecordType(TIMESTAMP(3)
agg_window_start, TIMESTAMP(3) agg_window_end, TIMESTAMP_LTZ(3) ROWTIME
agg_window_time, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version,
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" r_version) NOT NULL Difference:
agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3) ROWTIME
{code}
> Use of SQL functions with time attributes causes downstream temporal
> functions to fail
> --------------------------------------------------------------------------------------
>
> Key: FLINK-38162
> URL: https://issues.apache.org/jira/browse/FLINK-38162
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.20.0, 2.1.0
> Reporter: Nic Townsend
> Priority: Minor
> Attachments: SQLSubmitter.java
>
>
> In the Flink docs for window join -
> [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer]
> - it uses \{COALESCE} in a FULL JOIN to ensure the row has a value for
> \{window_start} and \{window_end}.
> However, the example omits \{window_time} from the result - which is needed
> for downstream re-windowing, or cascading windows
> ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation).]
> An example use case might be "I have a two phased commit process - and I want
> to know how many transactions in a window were just LEFT, how many were
> RIGHT, and how many were COMPLETE". It possibly is very niche - the problem
> is that adding in \{window_time} causes a series of different errors to occur
> with downstream operators.
> h4. Problem 1:
> If you use {{COALESCE}} for \{window_time}, a {{DESCRIBE}} will show the
> column as being \{ROWTIME}:
> {code:java}
> +----------+------------------------+------+-----+--------+----------------------------------+
> | name | type | null | key | extras |
> watermark |
> +----------+------------------------+------+-----+--------+----------------------------------+
> | window_time | TIMESTAMP(3) *ROWTIME* | true | | | | {code}
> But - if you use a Windowing TVF on the results of the join, Flink throws an
> error that {{window_time}} is not a time attribute.
> This is (I assume) due to the use of \{COALESCE} :
> [https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/dev/table/concepts/time_attributes/#introduction-to-time-attributes]
> says:
> > As long as a time attribute is not modified, and is simply forwarded from
> > one part of a query to another, it remains a valid time attribute. Time
> > attributes behave like regular timestamps, and are accessible for
> > calculations. When used in calculations, time attributes are materialized
> > and act as standard timestamps. However, ordinary timestamps cannot be used
> > in place of, or be converted to, time attributes.
> h4. Problem 2:
> If you try to use cascading windows and perform window aggregation after the
> join, you do not get a windowed aggregate. Instead, the planner will create
> an unbounded aggregate. As with problem 1, the assumption based off
> ([https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#cascading-window-aggregation)]
> is that the time attribute has not propagated, so the window does not
> cascade.
> h4. Problem 3:
> If you try to use cascading window aggregation, but also include a statement
> set to \{insert} the results of the join and the aggregate into separate
> sinks, then you get a Calcite error:
> {code:java}
> java.lang.RuntimeException: Error while applying rule ProjectToCalcRule, args
> [rel#1277:LogicalProject.NONE.any.None:
> 0.[NONE].[NONE](input=RelSubset#1276,exprs=[$2, $3, $4, $0, $1])]
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
> at
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:523)
> at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:318)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
> at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
> at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeBlock(StreamCommonSubGraphBasedOptimizer.scala:143)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2(StreamCommonSubGraphBasedOptimizer.scala:89)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.$anonfun$optimizeSinkBlocks$2$adapted(StreamCommonSubGraphBasedOptimizer.scala:89)
> at
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:89)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:320)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:534)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
> at
> org.apache.flink.table.api.internal.StatementSetImpl.explain(StatementSetImpl.java:103)
> at org.apache.flink.table.api.Explainable.explain(Explainable.java:40)
> at com.example.SQLSubmitter.twoInserts(SQLSubmitter.java:156)
> at com.example.SQLSubmitter.main(SQLSubmitter.java:134)
> Caused by: java.lang.RuntimeException: Error occurred while applying rule
> ProjectToCalcRule
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:157)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:269)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:284)
> at
> org.apache.calcite.rel.rules.ProjectToCalcRule.onMatch(ProjectToCalcRule.java:72)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:223)
> ... 36 more
> Caused by: java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3)
> agg_window_end, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) agg_window_time,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE" r_version) NOT NULL
> equiv rowtype: RecordType(TIMESTAMP(3) agg_window_start, TIMESTAMP(3)
> agg_window_end, TIMESTAMP_LTZ(3) *ROWTIME* agg_window_time,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" l_version, VARCHAR(2147483647)
> CHARACTER SET "UTF-16LE" r_version) NOT NULL
> Difference:
> agg_window_time: TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) -> TIMESTAMP_LTZ(3)
> *ROWTIME*
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:592)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:613)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:144)
> ... 40 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)