[
https://issues.apache.org/jira/browse/FLINK-23919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
godfrey he closed FLINK-23919.
------------------------------
Resolution: Fixed
Fixed in 1.15.0: 003df215b482c246c48c147b63b56608c6557cba
Fixed in 1.14.1: 5b3e3a8fd1dec3a41a7ff41835dc11456ad6836b
Fixed in 1.13.4: 934aa94c8509149079e375879ff5d3d4b86e15ad
> PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for
> Window TVF
> --------------------------------------------------------------------------------------
>
> Key: FLINK-23919
> URL: https://issues.apache.org/jira/browse/FLINK-23919
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.13.2
> Reporter: Yuval Itzchakov
> Assignee: JING ZHANG
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0, 1.14.1, 1.13.4
>
> Attachments: image-2021-08-23-13-31-24-052.png
>
>
> Given the following Window TVF:
> {code:java}
> SELECT window_time,
> MIN(alert_timestamp) as start_time,
> MAX(alert_timestamp) as end_time
> FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL
> '3' MINUTE))
> WHERE service_source = 'source'
> GROUP BY window_start, window_end, window_time
> {code}
> Where the schema of alert_table is:
> {code:java}
> alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
> service_source: VARCHAR{code}
> The following generates an invalid RowType:
> {code:java}
> Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule,
> args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None:
> 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start],
> win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS
> start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start,
> end('w$) AS window_end, rowtime('w$) AS window_time),
> rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None:
> 0.[NONE].[NONE](input=RelSubset#355,distribution=single),
> rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None:
> 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end,
> window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source,
> _UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")),
> rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None:
> 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp],
> size=[3 min]))]Error while applying rule
> PullUpWindowTableFunctionIntoWindowAggregateRule, args
> [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None:
> 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start],
> win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS
> start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start,
> end('w$) AS window_end, rowtime('w$) AS window_time),
> rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None:
> 0.[NONE].[NONE](input=RelSubset#355,distribution=single),
> rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None:
> 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end,
> window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source,
> _UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE")),
> rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None:
> 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp],
> size=[3 min]))] at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
> at
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
> at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at
> scala.collection.Iterator.foreach(Iterator.scala:943) at
> scala.collection.Iterator.foreach$(Iterator.scala:943) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
> at
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
>
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)Caused by:
> java.lang.RuntimeException: Error occurred while applying rule
> PullUpWindowTableFunctionIntoWindowAggregateRule at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
> at
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
> ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field
> names must be unique. Found duplicates: [alert_timestamp] at
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272)
> at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at
> org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at
> org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
> at
> org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
> at
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391)
> at
> org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
> at java.base/java.util.HashMap.hash(HashMap.java:339) at
> java.base/java.util.HashMap.get(HashMap.java:552) at
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
> {code}
> Looking at the code, it seems that when
> PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in
> WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime
> column from the input row to the new calc without checking to see if there
> are any name collisions. Also, TBH I'm not entirely sure yet why the rowtime
> column of the input table is being added to the projected output row like
> that?
> !image-2021-08-23-13-31-24-052.png|width=887,height=163!
> [~jark] would appreciate your help with this.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)