[ 
https://issues.apache.org/jira/browse/FLINK-23919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-23919:
-----------------------------------
    Labels: pull-request-available  (was: )

> PullUpWindowTableFunctionIntoWindowAggregateRule generates invalid Calc for 
> Window TVF
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-23919
>                 URL: https://issues.apache.org/jira/browse/FLINK-23919
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.2
>            Reporter: Yuval Itzchakov
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2021-08-23-13-31-24-052.png
>
>
> Given the following Window TVF:
> {code:java}
> SELECT window_time, 
>        MIN(alert_timestamp) as start_time, 
>        MAX(alert_timestamp) as end_time 
> FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL 
> '3' MINUTE)) 
> WHERE service_source = 'source' 
> GROUP BY window_start, window_end, window_time
> {code}
> Where the schema of alert_table is:
> {code:java}
> alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
> service_source: VARCHAR{code}
> The following generates an invalid RowType:
> {code:java}
> Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule, 
> args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS 
> start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, 
> end('w$) AS window_end, rowtime('w$) AS window_time), 
> rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
> 0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
> rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
> window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
> _UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")), 
> rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
> size=[3 min]))]Error while applying rule 
> PullUpWindowTableFunctionIntoWindowAggregateRule, args 
> [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS 
> start_time, MAX(alert_timestamp) AS end_time, start('w$) AS window_start, 
> end('w$) AS window_end, rowtime('w$) AS window_time), 
> rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None: 
> 0.[NONE].[NONE](input=RelSubset#355,distribution=single), 
> rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end, 
> window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source, 
> _UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE")), 
> rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None: 
> 0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp], 
> size=[3 min]))] at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>  at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>  at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) 
> at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>  at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at 
> scala.collection.Iterator.foreach(Iterator.scala:943) at 
> scala.collection.Iterator.foreach$(Iterator.scala:943) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
> scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at 
> scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at 
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>  at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>  at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
>  at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
>  at 
> org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
>  
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> java.lang.RuntimeException: Error occurred while applying rule 
> PullUpWindowTableFunctionIntoWindowAggregateRule at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
>  at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268) 
> at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283) 
> at 
> org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>  ... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field 
> names must be unique. Found duplicates: [alert_timestamp] at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272) 
> at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at 
> org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at 
> org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
>  at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
>  at 
> org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
>  at 
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391) 
> at 
> org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
>  at java.base/java.util.HashMap.hash(HashMap.java:339) at 
> java.base/java.util.HashMap.get(HashMap.java:552) at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
>  at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
>  at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
> {code}
> Looking at the code, it seems that when 
> PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in 
> WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime 
> column from the input row to the new calc without checking to see if there 
> are any name collisions. Also, TBH I'm not entirely sure yet why the rowtime 
> column of the input table is being added to the projected output row like 
> that?
> !image-2021-08-23-13-31-24-052.png|width=887,height=163!  
> [~jark] would appreciate your help with this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to