[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2022-01-25 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482183#comment-17482183
 ] 

ChangjiGuo commented on FLINK-25475:


Hi, [~xuyangzhong]. Thanks for your reply!
Similar to this kind of sql:
{code:sql}
SELECT b, SUM(cnt)
 FROM (
   SELECT b,
 COUNT(a) as cnt,
 HOP_START(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_start,
 HOP_END(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND) as w_end
   FROM wmTable1
   GROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '6' SECOND)
 )
 GROUP BY b
{code}


> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2022-01-16 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17476968#comment-17476968
 ] 

xuyang commented on FLINK-25475:


Hi ,[~ChangjiGuo] . Are you glad to provide the test case to be convenient to 
locate the bug?

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-30 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17467081#comment-17467081
 ] 

ChangjiGuo commented on FLINK-25475:


Are there any side effects if the StreamExecMiniBatchAssigner node is added to 
the front of StreamExecGroupAggregate node? As far as I know, the current 
implementation is added in front of the source node or watermark node.

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25475) When windowAgg and groupAgg are included at the same time, there is no assigner generated but MiniBatch optimization is still used.

2021-12-29 Thread ChangjiGuo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17466345#comment-17466345
 ] 

ChangjiGuo commented on FLINK-25475:


Hello [~jark], Can you help me review this issue?

> When windowAgg and groupAgg are included at the same time, there is no 
> assigner generated but MiniBatch optimization is still used.
> ---
>
> Key: FLINK-25475
> URL: https://issues.apache.org/jira/browse/FLINK-25475
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: ChangjiGuo
>Priority: Major
> Attachments: image-2021-12-29-16-04-50-211.png, 
> image-2021-12-29-16-05-15-519.png
>
>
> If the relNode has both windowAgg and groupAgg, MiniBatchIntervalInferRule 
> will not add StreamExecMiniBatchAssigner node, but MiniBatchGroupAggFunction 
> or MiniBatchLocalGroupAggFunction or MiniBatchGlobalGroupAggFunction will 
> still be generated when translated into transformation.
> It will only judge whether to enable minibacth.
> {code:java}
> val operator = if (isMiniBatchEnabled) {
>   val aggFunction = new MiniBatchGroupAggFunction(
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputRowType,
> inputCountIndex,
> generateUpdateBefore)
>   new KeyedMapBundleOperator(
> aggFunction,
> AggregateUtil.createMiniBatchTrigger(tableConfig))
> } else {
>   val aggFunction = new GroupAggFunction(
> tableConfig.getMinIdleStateRetentionTime,
> tableConfig.getMaxIdleStateRetentionTime,
> aggsHandler,
> recordEqualiser,
> accTypes,
> inputCountIndex,
> generateUpdateBefore)
>   val operator = new KeyedProcessOperator[RowData, RowData, 
> RowData](aggFunction)
>   operator
> } {code}
> for example:
> before:
> !image-2021-12-29-16-04-50-211.png!
> after:
> !image-2021-12-29-16-05-15-519.png!
> The WatermarkAssigner will send watermark to downstream, and the finishBundle 
> method will be called frequently, which does not match the expected result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)