[ https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17467081#comment-17467081 ]
ChangjiGuo edited comment on FLINK-25475 at 12/31/21, 5:55 AM: --------------------------------------------------------------- Are there any side effects if the StreamExecMiniBatchAssigner node is added to the front of StreamExecGroupAggregate node? As far as I know, the current implementation is added after the source node or watermark node. was (Author: changjiguo): Are there any side effects if the StreamExecMiniBatchAssigner node is added to the front of StreamExecGroupAggregate node? As far as I know, the current implementation is added in front of the source node or watermark node. > When windowAgg and groupAgg are included at the same time, there is no > assigner generated but MiniBatch optimization is still used. > ----------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-25475 > URL: https://issues.apache.org/jira/browse/FLINK-25475 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Reporter: ChangjiGuo > Priority: Major > Attachments: image-2021-12-29-16-04-50-211.png, > image-2021-12-29-16-05-15-519.png > > > If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule > will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction > or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will > still be generated when translated into transformation. > It will only judge whether to enable minibacth. > {code:java} > val operator = if (isMiniBatchEnabled) { > val aggFunction = new MiniBatchGroupAggFunction( > aggsHandler, > recordEqualiser, > accTypes, > inputRowType, > inputCountIndex, > generateUpdateBefore) > new KeyedMapBundleOperator( > aggFunction, > AggregateUtil.createMiniBatchTrigger(tableConfig)) > } else { > val aggFunction = new GroupAggFunction( > tableConfig.getMinIdleStateRetentionTime, > tableConfig.getMaxIdleStateRetentionTime, > aggsHandler, > recordEqualiser, > accTypes, > inputCountIndex, > generateUpdateBefore) > val operator = new KeyedProcessOperator[RowData, RowData, > RowData](aggFunction) > operator > } {code} > for example: > before: > !image-2021-12-29-16-04-50-211.png! > after: > !image-2021-12-29-16-05-15-519.png! > The WatermarkAssigner will send watermark to downstream, and the finishBundle > method will be called frequently, which does not match the expected result. -- This message was sent by Atlassian Jira (v8.20.1#820001)