[ 
https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17583571#comment-17583571
 ] 

Piotr Nowojski commented on FLINK-28504:
----------------------------------------

{quote}
After giving this a second look I think the current code behaviour is indeed 
correct and I don't see any bug here.

You idle source subtasks have never sent any watermark to some of the 
downstream local aggregations. So those LocalWindowAggregate correctly don't 
report any watermark, resulting in Low Watermark in the webUI not being 
reported. As correctly, low watermark is NaN for the task as a whole.

It might be a bit confusing the current behaviour. Ideally maybe WebUI should 
present the lowest non NaN watermark, with a caveate/asteriks, that some 
subtasks are idle. But to me the current behaviour is also valid.
{quote}

> Local-Global aggregation causes watermark alignment 
> (table.exec.source.idle-timeout) of idle partition invalid
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28504
>                 URL: https://issues.apache.org/jira/browse/FLINK-28504
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.5
>         Environment: flink 1.14
> kafka 2.4
>            Reporter: nyingping
>            Assignee: nyingping
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: image-2022-07-12-15-11-51-653.png, 
> image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png
>
>
> I have a window topN test task, the code is as follows
>  
> {code:java}
>  Configuration configuration = new Configuration();
>         configuration.setInteger(RestOptions.PORT, 8082);
>         StreamExecutionEnvironment streamExecutionEnvironment =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>  
>         StreamTableEnvironment st = 
> StreamTableEnvironment.create(streamExecutionEnvironment);
>  
>         
> st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", 
> "10s");
>         st.executeSql(
>                 "CREATE TABLE test (\n"
>                         + "  `key` STRING,\n"
>                         + "  `time` TIMESTAMP(3),\n"
>                         + "  `price` float,\n"
>                         + "  WATERMARK FOR `time` AS `time` - INTERVAL '10' 
> SECOND"
>                         + ") WITH (\n"
>                         + "  'connector' = 'kafka',\n"
>                         + "  'topic' = 'test',\n"
>                         + "  'properties.bootstrap.servers' = 
> 'testlocal:9092',\n"
>                         + "  'properties.group.id' = 'windowGroup',\n"
>                         + "  'scan.startup.mode' = 'latest-offset',\n"
>                         + "  'format' = 'json'\n"
>                         + ")"
>     String sqlWindowTopN =
>                 "select * from (" +
>                 "  select *, " +
>                 "   ROW_NUMBER() over (partition by window_start, window_end 
> order by total desc ) as rownum " +
>                 "     from (" +
>                 "       select key,window_start,window_end,count(key) as 
> `count`,sum(price) total from table (" +
>                 "           tumble(TABLE test, DESCRIPTOR(`time`), interval 
> '1' minute)" +
>                 "        ) group by window_start, window_end, key" +
>                 "   )" +
>                 ") where rownum <= 3";
>     st.executeSql(sqlWindowTopN).print(); {code}
>  
>  
> Run and do not get result on long time after.
> Watermark appears as follows on the UI
>  
> !image-2022-07-12-15-11-51-653.png|width=898,height=388!
> I didn't set the parallelism manually, so it defaults to 12. The data source 
> Kafka has only one partition, so there are free partitions. To align the 
> watermarks for the entire task, I use the `table.exec. source. Idle-timeout` 
> configuration.
>  
> As above show,I found that the system automatically split window-Topn SQL 
> into local-global aggregation tasks. In the Local phase, watermark didn't 
> work as well as I expected.
>  
> Manually setting the parallelism to 1 did what I expected.
> {code:java}
> streamExecutionEnvironment.setParallelism(1); {code}
> !image-2022-07-12-15-19-29-950.png|width=872,height=384!
>  
> I can also manually configure the system not to split into local-global 
> phases. At this point, the `table.exec.source-idle-timeout ` configuration 
> takes effect and the watermark is aligned.
> {code:java}
> st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
>  "ONE_PHASE"); {code}
> result:
> !image-2022-07-12-15-20-06-919.png|width=866,height=357!
>  
> To sum up, when the parallelism of Kafka partition is different from that of 
> Flink, and idle partitions are generated, I expect to use the 
> configuration'table exec. source. Idle-timeout'to use watermark alignment, 
> but here it seems to fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to