Yuval Itzchakov created FLINK-23919:
---------------------------------------
Summary: PullUpWindowTableFunctionIntoWindowAggregateRule
generates invalid Calc for Window TVF
Key: FLINK-23919
URL: https://issues.apache.org/jira/browse/FLINK-23919
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.13.2
Reporter: Yuval Itzchakov
Given the following Window TVF:
{code:java}
SELECT window_time,
MIN(alert_timestamp) as start_time,
MAX(alert_timestamp) as end_time
FROM TABLE(TUMBLE(TABLE alert_table, DESCRIPTOR(alert_timestamp), INTERVAL '3'
MINUTE))
WHERE service_source = 'source' GROUP BY window_start, window_end, window_time
{code}
Where the schema of alert_table is:
{code:java}
alert_timestamp: TIMESTAMP(3) ROWTIME INDICATOR
service_source: VARCHAR{code}
The following generates an invalid RowType:
{code:java}
Error while applying rule PullUpWindowTableFunctionIntoWindowAggregateRule,
args [rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start],
win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time,
MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS
window_end, rowtime('w$) AS window_time),
rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None:
0.[NONE].[NONE](input=RelSubset#355,distribution=single),
rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end,
window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source,
_UTF-16LE'my source':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")),
rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp],
size=[3 min]))]Error while applying rule
PullUpWindowTableFunctionIntoWindowAggregateRule, args
[rel#358:StreamPhysicalWindowAggregate.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#356,window=TUMBLE(win_start=[window_start],
win_end=[window_end], size=[3 min]),select=MIN(alert_timestamp) AS start_time,
MAX(alert_timestamp) AS end_time, start('w$) AS window_start, end('w$) AS
window_end, rowtime('w$) AS window_time),
rel#367:StreamPhysicalExchange.STREAM_PHYSICAL.single.None:
0.[NONE].[NONE](input=RelSubset#355,distribution=single),
rel#354:StreamPhysicalCalc.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#353,select=window_start, window_end,
window_time, CAST(alert_timestamp) AS alert_timestamp,where==(service_source,
_UTF-16LE'Microsoft Defender for Identity':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE")),
rel#352:StreamPhysicalWindowTableFunction.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE](input=RelSubset#351,window=TUMBLE(time_col=[alert_timestamp],
size=[3 min]))] at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
at
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:69)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
at scala.collection.Iterator.foreach(Iterator.scala:943) at
scala.collection.Iterator.foreach$(Iterator.scala:943) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at
scala.collection.IterableLike.foreach(IterableLike.scala:74) at
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at
scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at
scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)Caused by:
java.lang.RuntimeException: Error occurred while applying rule
PullUpWindowTableFunctionIntoWindowAggregateRule at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:161)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:268)
at org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:283)
at
org.apache.flink.table.planner.plan.rules.physical.stream.PullUpWindowTableFunctionIntoWindowAggregateRule.onMatch(PullUpWindowTableFunctionIntoWindowAggregateRule.scala:143)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
... 31 moreCaused by: org.apache.flink.table.api.ValidationException: Field
names must be unique. Found duplicates: [alert_timestamp] at
org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272)
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157) at
org.apache.flink.table.types.logical.RowType.of(RowType.java:297) at
org.apache.flink.table.types.logical.RowType.of(RowType.java:289) at
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:657)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList$lzycompute(StreamPhysicalWindowAggregate.scala:60)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.aggInfoList(StreamPhysicalWindowAggregate.scala:59)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWindowAggregate.explainTerms(StreamPhysicalWindowAggregate.scala:86)
at
org.apache.calcite.rel.AbstractRelNode.getDigestItems(AbstractRelNode.java:409)
at
org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:391)
at
org.apache.calcite.rel.AbstractRelNode$InnerRelDigest.hashCode(AbstractRelNode.java:443)
at java.base/java.util.HashMap.hash(HashMap.java:339) at
java.base/java.util.HashMap.get(HashMap.java:552) at
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1150)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.transformTo(VolcanoRuleCall.java:148)
{code}
Looking at the code, it seems that whenÂ
PullUpWindowTableFunctionIntoWindowAggregateRule is building the new Calc in
WindowUtil.buildNewProgramWithoutWindowColumns, it is adding the rowtime column
from the input row to the new calc without checking to see if there are any
name collisions. Also, TBH I'm not entirely sure yet why the rowtime column of
the input table is being added to the projected output row like that?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)