[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482183#comment-17482183 ] ChangjiGuo commented on FLINK-25475: Hi, [~xuyangzhong]. Thanks for your reply! Similar to this kind of sql: {code:sql} SELECT b, SUM(cnt) FROM ( SELECT b, COUNT(a) as cnt, HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start, HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end FROM wmTable1 GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) ) GROUP BY b {code} > When windowAgg and groupAgg are included at the same time, there is no > assigner generated but MiniBatch optimization is still used. > --- > > Key: FLINK-25475 > URL: https://issues.apache.org/jira/browse/FLINK-25475 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.14.2 >Reporter: ChangjiGuo >Priority: Major > Attachments: image-2021-12-29-16-04-50-211.png, > image-2021-12-29-16-05-15-519.png > > > If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule > will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction > or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will > still be generated when translated into transformation. > It will only judge whether to enable minibacth. > {code:java} > val operator = if (isMiniBatchEnabled) { > val aggFunction = new MiniBatchGroupAggFunction( > aggsHandler, > recordEqualiser, > accTypes, > inputRowType, > inputCountIndex, > generateUpdateBefore) > new KeyedMapBundleOperator( > aggFunction, > AggregateUtil.createMiniBatchTrigger(tableConfig)) > } else { > val aggFunction = new GroupAggFunction( > tableConfig.getMinIdleStateRetentionTime, > tableConfig.getMaxIdleStateRetentionTime, > aggsHandler, > recordEqualiser, > accTypes, > inputCountIndex, > generateUpdateBefore) > val operator = new KeyedProcessOperator[RowData, RowData, > RowData](aggFunction) > operator > } {code} > for example: > before: > !image-2021-12-29-16-04-50-211.png! > after: > !image-2021-12-29-16-05-15-519.png! > The WatermarkAssigner will send watermark to downstream, and the finishBundle > method will be called frequently, which does not match the expected result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17476968#comment-17476968 ] xuyang commented on FLINK-25475: Hi ,[~ChangjiGuo] . Are you glad to provide the test case to be convenient to locate the bug? > When windowAgg and groupAgg are included at the same time, there is no > assigner generated but MiniBatch optimization is still used. > --- > > Key: FLINK-25475 > URL: https://issues.apache.org/jira/browse/FLINK-25475 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.14.2 >Reporter: ChangjiGuo >Priority: Major > Attachments: image-2021-12-29-16-04-50-211.png, > image-2021-12-29-16-05-15-519.png > > > If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule > will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction > or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will > still be generated when translated into transformation. > It will only judge whether to enable minibacth. > {code:java} > val operator = if (isMiniBatchEnabled) { > val aggFunction = new MiniBatchGroupAggFunction( > aggsHandler, > recordEqualiser, > accTypes, > inputRowType, > inputCountIndex, > generateUpdateBefore) > new KeyedMapBundleOperator( > aggFunction, > AggregateUtil.createMiniBatchTrigger(tableConfig)) > } else { > val aggFunction = new GroupAggFunction( > tableConfig.getMinIdleStateRetentionTime, > tableConfig.getMaxIdleStateRetentionTime, > aggsHandler, > recordEqualiser, > accTypes, > inputCountIndex, > generateUpdateBefore) > val operator = new KeyedProcessOperator[RowData, RowData, > RowData](aggFunction) > operator > } {code} > for example: > before: > !image-2021-12-29-16-04-50-211.png! > after: > !image-2021-12-29-16-05-15-519.png! > The WatermarkAssigner will send watermark to downstream, and the finishBundle > method will be called frequently, which does not match the expected result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17467081#comment-17467081 ] ChangjiGuo commented on FLINK-25475: Are there any side effects if the StreamExecMiniBatchAssigner node is added to the front of StreamExecGroupAggregate node? As far as I know, the current implementation is added in front of the source node or watermark node. > When windowAgg and groupAgg are included at the same time, there is no > assigner generated but MiniBatch optimization is still used. > --- > > Key: FLINK-25475 > URL: https://issues.apache.org/jira/browse/FLINK-25475 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: ChangjiGuo >Priority: Major > Attachments: image-2021-12-29-16-04-50-211.png, > image-2021-12-29-16-05-15-519.png > > > If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule > will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction > or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will > still be generated when translated into transformation. > It will only judge whether to enable minibacth. > {code:java} > val operator = if (isMiniBatchEnabled) { > val aggFunction = new MiniBatchGroupAggFunction( > aggsHandler, > recordEqualiser, > accTypes, > inputRowType, > inputCountIndex, > generateUpdateBefore) > new KeyedMapBundleOperator( > aggFunction, > AggregateUtil.createMiniBatchTrigger(tableConfig)) > } else { > val aggFunction = new GroupAggFunction( > tableConfig.getMinIdleStateRetentionTime, > tableConfig.getMaxIdleStateRetentionTime, > aggsHandler, > recordEqualiser, > accTypes, > inputCountIndex, > generateUpdateBefore) > val operator = new KeyedProcessOperator[RowData, RowData, > RowData](aggFunction) > operator > } {code} > for example: > before: > !image-2021-12-29-16-04-50-211.png! > after: > !image-2021-12-29-16-05-15-519.png! > The WatermarkAssigner will send watermark to downstream, and the finishBundle > method will be called frequently, which does not match the expected result. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.
[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17466345#comment-17466345 ] ChangjiGuo commented on FLINK-25475: Hello [~jark], Can you help me review this issue? > When windowAgg and groupAgg are included at the same time, there is no > assigner generated but MiniBatch optimization is still used. > --- > > Key: FLINK-25475 > URL: https://issues.apache.org/jira/browse/FLINK-25475 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.14.0 >Reporter: ChangjiGuo >Priority: Major > Attachments: image-2021-12-29-16-04-50-211.png, > image-2021-12-29-16-05-15-519.png > > > If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule > will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction > or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will > still be generated when translated into transformation. > It will only judge whether to enable minibacth. > {code:java} > val operator = if (isMiniBatchEnabled) { > val aggFunction = new MiniBatchGroupAggFunction( > aggsHandler, > recordEqualiser, > accTypes, > inputRowType, > inputCountIndex, > generateUpdateBefore) > new KeyedMapBundleOperator( > aggFunction, > AggregateUtil.createMiniBatchTrigger(tableConfig)) > } else { > val aggFunction = new GroupAggFunction( > tableConfig.getMinIdleStateRetentionTime, > tableConfig.getMaxIdleStateRetentionTime, > aggsHandler, > recordEqualiser, > accTypes, > inputCountIndex, > generateUpdateBefore) > val operator = new KeyedProcessOperator[RowData, RowData, > RowData](aggFunction) > operator > } {code} > for example: > before: > !image-2021-12-29-16-04-50-211.png! > after: > !image-2021-12-29-16-05-15-519.png! > The WatermarkAssigner will send watermark to downstream, and the finishBundle > method will be called frequently, which does not match the expected result. -- This message was sent by Atlassian Jira (v8.20.1#820001)