[ 
https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nyingping updated FLINK-28504:
------------------------------
    Description: 
I have a window topN test task, the code is as follows

 
{code:java}
 Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8082);
        StreamExecutionEnvironment streamExecutionEnvironment =
                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
        StreamTableEnvironment st = 
StreamTableEnvironment.create(streamExecutionEnvironment);
 
        
st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", 
"10s");
        st.executeSql(
                "CREATE TABLE test (\n"
                        + "  `key` STRING,\n"
                        + "  `time` TIMESTAMP(3),\n"
                        + "  `price` float,\n"
                        + "  WATERMARK FOR `time` AS `time` - INTERVAL '10' 
SECOND"
                        + ") WITH (\n"
                        + "  'connector' = 'kafka',\n"
                        + "  'topic' = 'test',\n"
                        + "  'properties.bootstrap.servers' = 
'testlocal:9092',\n"
                        + "  'properties.group.id' = 'windowGroup',\n"
                        + "  'scan.startup.mode' = 'latest-offset',\n"
                        + "  'format' = 'json'\n"
                        + ")"
    String sqlWindowTopN =
                "select * from (" +
                "  select *, " +
                "   ROW_NUMBER() over (partition by window_start, window_end 
order by total desc ) as rownum " +
                "     from (" +
                "       select key,window_start,window_end,count(key) as 
`count`,sum(price) total from table (" +
                "           tumble(TABLE test, DESCRIPTOR(`time`), interval '1' 
minute)" +
                "        ) group by window_start, window_end, key" +
                "   )" +
                ") where rownum <= 3";
    st.executeSql(sqlWindowTopN).print(); {code}
 

 

Run and do not get result on long time after.

Watermark appears as follows on the UI

 

!image-2022-07-12-15-11-51-653.png|width=898,height=388!

I didn't set the parallelism manually, so it defaults to 12. The data source 
Kafka has only one partition, so there are free partitions. To align the 
watermarks for the entire task, I use the `table.exec. source. Idle-timeout` 
configuration.

 

As above show,I found that the system automatically split window-Topn SQL into 
local-global aggregation tasks. In the Local phase, watermark didn't work as 
well as I expected.

 

Manually setting the parallelism to 1 did what I expected.
{code:java}
streamExecutionEnvironment.setParallelism(1); {code}
!image-2022-07-12-15-19-29-950.png|width=872,height=384!

 

I can also manually configure the system not to split into local-global phases. 
At this point, the `table.exec.source-idle-timeout ` configuration takes effect 
and the watermark is aligned.
{code:java}
st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
 "ONE_PHASE"); {code}
result:

!image-2022-07-12-15-20-06-919.png|width=866,height=357!

 

To sum up, when the parallelism of Kafka partition is different from that of 
Flink, and idle partitions are generated, I expect to use the 
configuration'table exec. source. Idle-timeout'to use watermark alignment, but 
here it seems to fail.

  was:
I have a window topN test task, the code is as follows

 
{code:java}
 Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 8082);
        StreamExecutionEnvironment streamExecutionEnvironment =
                
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
 
        StreamTableEnvironment st = 
StreamTableEnvironment.create(streamExecutionEnvironment);
 
        
st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", 
"10s");
        st.executeSql(
                "CREATE TABLE test (\n"
                        + "  `key` STRING,\n"
                        + "  `time` TIMESTAMP(3),\n"
                        + "  `price` float,\n"
                        + "  WATERMARK FOR `time` AS `time` - INTERVAL '10' 
SECOND"
                        + ") WITH (\n"
                        + "  'connector' = 'kafka',\n"
                        + "  'topic' = 'test',\n"
                        + "  'properties.bootstrap.servers' = 
'testlocal:9092',\n"
                        + "  'properties.group.id' = 'windowGroup',\n"
                        + "  'scan.startup.mode' = 'latest-offset',\n"
                        + "  'format' = 'json'\n"
                        + ")"
    String sqlWindowTopN =
                "select * from (" +
                "  select *, " +
                "   ROW_NUMBER() over (partition by window_start, window_end 
order by total desc ) as rownum " +
                "     from (" +
                "       select key,window_start,window_end,count(key) as 
`count`,sum(price) total from table (" +
                "           tumble(TABLE test, DESCRIPTOR(`time`), interval '1' 
minute)" +
                "        ) group by window_start, window_end, key" +
                "   )" +
                ") where rownum <= 3";
    st.executeSql(sqlWindowTopN).print(); {code}
 

 

Run and do not get result on long time after.

Watermark appears as follows on the UI

 

!image-2022-07-12-15-11-51-653.png|width=898,height=388!

I didn't set the parallelism manually, so it defaults to 12. The data source 
Kafka has only one partition, so there are free partitions. To align the 
watermarks for the entire task, I use the `table.exec. source. Idle-timeout` 
configuration.

 

As above show,I found that the system automatically split window-Topn SQL into 
local-global aggregation tasks. In the Local phase, watermark didn't work as 
well as I expected.

 

Manually setting the parallelism to 1 did what I expected.
{code:java}
streamExecutionEnvironment.setParallelism(1); {code}
!image-2022-07-12-15-19-29-950.png|width=872,height=384!

 

I can also manually configure the system not to split into local-global phases. 
At this point, the `table.exec.source-idle-timeout ` configuration takes effect 
and the watermark is aligned.
{code:java}
st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
 "ONE_PHASE"); {code}
result:

!image-2022-07-12-15-20-06-919.png|width=866,height=357!

 

To sum up, when the parallelism of Kafka partition and Flink is the same, and 
idle partitions are generated, I expect to use the configuration'table exec. 
source. Idle-timeout'to use watermark alignment, but here it seems to fail.


> Local-Global aggregation causes watermark alignment 
> (table.exec.source.idle-timeout) of idle partition invalid
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-28504
>                 URL: https://issues.apache.org/jira/browse/FLINK-28504
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.2
>         Environment: flink 1.14.1
> kafka 2.4
>            Reporter: nyingping
>            Priority: Major
>         Attachments: image-2022-07-12-15-11-51-653.png, 
> image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png
>
>
> I have a window topN test task, the code is as follows
>  
> {code:java}
>  Configuration configuration = new Configuration();
>         configuration.setInteger(RestOptions.PORT, 8082);
>         StreamExecutionEnvironment streamExecutionEnvironment =
>                 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>  
>         StreamTableEnvironment st = 
> StreamTableEnvironment.create(streamExecutionEnvironment);
>  
>         
> st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", 
> "10s");
>         st.executeSql(
>                 "CREATE TABLE test (\n"
>                         + "  `key` STRING,\n"
>                         + "  `time` TIMESTAMP(3),\n"
>                         + "  `price` float,\n"
>                         + "  WATERMARK FOR `time` AS `time` - INTERVAL '10' 
> SECOND"
>                         + ") WITH (\n"
>                         + "  'connector' = 'kafka',\n"
>                         + "  'topic' = 'test',\n"
>                         + "  'properties.bootstrap.servers' = 
> 'testlocal:9092',\n"
>                         + "  'properties.group.id' = 'windowGroup',\n"
>                         + "  'scan.startup.mode' = 'latest-offset',\n"
>                         + "  'format' = 'json'\n"
>                         + ")"
>     String sqlWindowTopN =
>                 "select * from (" +
>                 "  select *, " +
>                 "   ROW_NUMBER() over (partition by window_start, window_end 
> order by total desc ) as rownum " +
>                 "     from (" +
>                 "       select key,window_start,window_end,count(key) as 
> `count`,sum(price) total from table (" +
>                 "           tumble(TABLE test, DESCRIPTOR(`time`), interval 
> '1' minute)" +
>                 "        ) group by window_start, window_end, key" +
>                 "   )" +
>                 ") where rownum <= 3";
>     st.executeSql(sqlWindowTopN).print(); {code}
>  
>  
> Run and do not get result on long time after.
> Watermark appears as follows on the UI
>  
> !image-2022-07-12-15-11-51-653.png|width=898,height=388!
> I didn't set the parallelism manually, so it defaults to 12. The data source 
> Kafka has only one partition, so there are free partitions. To align the 
> watermarks for the entire task, I use the `table.exec. source. Idle-timeout` 
> configuration.
>  
> As above show,I found that the system automatically split window-Topn SQL 
> into local-global aggregation tasks. In the Local phase, watermark didn't 
> work as well as I expected.
>  
> Manually setting the parallelism to 1 did what I expected.
> {code:java}
> streamExecutionEnvironment.setParallelism(1); {code}
> !image-2022-07-12-15-19-29-950.png|width=872,height=384!
>  
> I can also manually configure the system not to split into local-global 
> phases. At this point, the `table.exec.source-idle-timeout ` configuration 
> takes effect and the watermark is aligned.
> {code:java}
> st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
>  "ONE_PHASE"); {code}
> result:
> !image-2022-07-12-15-20-06-919.png|width=866,height=357!
>  
> To sum up, when the parallelism of Kafka partition is different from that of 
> Flink, and idle partitions are generated, I expect to use the 
> configuration'table exec. source. Idle-timeout'to use watermark alignment, 
> but here it seems to fail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to