[jira] [Closed] (FLINK-28681) The number of windows allocated by sliding windows is inaccurate, when the window length is irregular
[ https://issues.apache.org/jira/browse/FLINK-28681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping closed FLINK-28681. - Resolution: Fixed > The number of windows allocated by sliding windows is inaccurate, when the > window length is irregular > - > > Key: FLINK-28681 > URL: https://issues.apache.org/jira/browse/FLINK-28681 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: nyingping >Priority: Minor > > In assignWindows, the initial length of the list of Windows is determined by > '(int) (size/slide)'. > {code:java} > List windows = new ArrayList<>((int) (size / slide)) {code} > This calculation is not accurate when the sliding window length is irregular. > For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives > 3, but the final number of Windows is 4. > > Although this does not affect functionality.But I think it would be better. > {code:java} > int len = (int) Math.ceil((double) size / slide); > List windows = new ArrayList<>(len); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping closed FLINK-28504. - Resolution: Not A Bug > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Assignee: nyingping >Priority: Major > Labels: pull-request-available > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28681) The number of windows allocated by sliding windows is inaccurate, when the window length is irregular
[ https://issues.apache.org/jira/browse/FLINK-28681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28681: -- Description: In assignWindows, the initial length of the list of Windows is determined by '(int) (size/slide)'. {code:java} List windows = new ArrayList<>((int) (size / slide)) {code} This calculation is not accurate when the sliding window length is irregular. For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 3, but the final number of Windows is 4. Although this does not affect functionality.But I think it would be better. {code:java} int len = (int) Math.ceil((double) size / slide); List windows = new ArrayList<>(len); {code} was: In assignWindows, the initial length of the list of Windows is determined by '(int) (size/slide)'. {code:java} List windows = new ArrayList<>((int) (size / slide)) {code} This calculation is not accurate when the sliding window length is irregular. For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 3, but the final number of Windows is 4. Although this does not affect functionality.But I think it would be better. {code:java} int len = (int) Math.ceil((double) (size / slide)); List windows = new ArrayList<>(len); {code} > The number of windows allocated by sliding windows is inaccurate, when the > window length is irregular > - > > Key: FLINK-28681 > URL: https://issues.apache.org/jira/browse/FLINK-28681 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: nyingping >Priority: Minor > > In assignWindows, the initial length of the list of Windows is determined by > '(int) (size/slide)'. > {code:java} > List windows = new ArrayList<>((int) (size / slide)) {code} > This calculation is not accurate when the sliding window length is irregular. > For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives > 3, but the final number of Windows is 4. > > Although this does not affect functionality.But I think it would be better. > {code:java} > int len = (int) Math.ceil((double) size / slide); > List windows = new ArrayList<>(len); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28681) The number of windows allocated by sliding windows is inaccurate, when the window length is irregular
nyingping created FLINK-28681: - Summary: The number of windows allocated by sliding windows is inaccurate, when the window length is irregular Key: FLINK-28681 URL: https://issues.apache.org/jira/browse/FLINK-28681 Project: Flink Issue Type: Improvement Components: Table SQL / Runtime Affects Versions: 1.15.1 Reporter: nyingping In assignWindows, the initial length of the list of Windows is determined by '(int) (size/slide)'. {code:java} List windows = new ArrayList<>((int) (size / slide)) {code} This calculation is not accurate when the sliding window length is irregular. For example, if the size is 10 and the slide is 3, '(int) (size/slide)' gives 3, but the final number of Windows is 4. Although this does not affect functionality.But I think it would be better. {code:java} int len = (int) Math.ceil((double) (size / slide)); List windows = new ArrayList<>(len); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17567885#comment-17567885 ] nyingping commented on FLINK-28504: --- [~martijnvisser] Hi, Could you assign this task to me? I'd love to finish the job. If the community thinks it's a BUG. > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565350#comment-17565350 ] nyingping edited comment on FLINK-28504 at 7/12/22 8:03 AM: [~martijnvisser] yes,It is work when set like this {code:java} st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); {code} was (Author: JIRAUSER282677): yes,It is work when set like this {code:java} st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); {code} > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565350#comment-17565350 ] nyingping commented on FLINK-28504: --- yes,It is work when set like this {code:java} st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); {code} > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28504: -- Affects Version/s: 1.14.5 (was: 1.14.2) > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17565341#comment-17565341 ] nyingping commented on FLINK-28504: --- [~martijnvisser] yeah,I have tested it on version 1.14.5, and the problem remains. > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.2 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28504: -- Environment: flink 1.14 kafka 2.4 was: flink 1.14.1 kafka 2.4 > Local-Global aggregation causes watermark alignment > (table.exec.source.idle-timeout) of idle partition invalid > -- > > Key: FLINK-28504 > URL: https://issues.apache.org/jira/browse/FLINK-28504 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.2 > Environment: flink 1.14 > kafka 2.4 >Reporter: nyingping >Priority: Major > Attachments: image-2022-07-12-15-11-51-653.png, > image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png > > > I have a window topN test task, the code is as follows > > {code:java} > Configuration configuration = new Configuration(); > configuration.setInteger(RestOptions.PORT, 8082); > StreamExecutionEnvironment streamExecutionEnvironment = > > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > > StreamTableEnvironment st = > StreamTableEnvironment.create(streamExecutionEnvironment); > > > st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", > "10s"); > st.executeSql( > "CREATE TABLE test (\n" > + " `key` STRING,\n" > + " `time` TIMESTAMP(3),\n" > + " `price` float,\n" > + " WATERMARK FOR `time` AS `time` - INTERVAL '10' > SECOND" > + ") WITH (\n" > + " 'connector' = 'kafka',\n" > + " 'topic' = 'test',\n" > + " 'properties.bootstrap.servers' = > 'testlocal:9092',\n" > + " 'properties.group.id' = 'windowGroup',\n" > + " 'scan.startup.mode' = 'latest-offset',\n" > + " 'format' = 'json'\n" > + ")" > String sqlWindowTopN = > "select * from (" + > " select *, " + > " ROW_NUMBER() over (partition by window_start, window_end > order by total desc ) as rownum " + > " from (" + > " select key,window_start,window_end,count(key) as > `count`,sum(price) total from table (" + > " tumble(TABLE test, DESCRIPTOR(`time`), interval > '1' minute)" + > " ) group by window_start, window_end, key" + > " )" + > ") where rownum <= 3"; > st.executeSql(sqlWindowTopN).print(); {code} > > > Run and do not get result on long time after. > Watermark appears as follows on the UI > > !image-2022-07-12-15-11-51-653.png|width=898,height=388! > I didn't set the parallelism manually, so it defaults to 12. The data source > Kafka has only one partition, so there are free partitions. To align the > watermarks for the entire task, I use the `table.exec. source. Idle-timeout` > configuration. > > As above show,I found that the system automatically split window-Topn SQL > into local-global aggregation tasks. In the Local phase, watermark didn't > work as well as I expected. > > Manually setting the parallelism to 1 did what I expected. > {code:java} > streamExecutionEnvironment.setParallelism(1); {code} > !image-2022-07-12-15-19-29-950.png|width=872,height=384! > > I can also manually configure the system not to split into local-global > phases. At this point, the `table.exec.source-idle-timeout ` configuration > takes effect and the watermark is aligned. > {code:java} > st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", > "ONE_PHASE"); {code} > result: > !image-2022-07-12-15-20-06-919.png|width=866,height=357! > > To sum up, when the parallelism of Kafka partition is different from that of > Flink, and idle partitions are generated, I expect to use the > configuration'table exec. source. Idle-timeout'to use watermark alignment, > but here it seems to fail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28504: -- Description: I have a window topN test task, the code is as follows {code:java} Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); {code} Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png|width=898,height=388! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. {code:java} streamExecutionEnvironment.setParallelism(1); {code} !image-2022-07-12-15-19-29-950.png|width=872,height=384! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. {code:java} st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); {code} result: !image-2022-07-12-15-20-06-919.png|width=866,height=357! To sum up, when the parallelism of Kafka partition is different from that of Flink, and idle partitions are generated, I expect to use the configuration'table exec. source. Idle-timeout'to use watermark alignment, but here it seems to fail. was: I have a window topN test task, the code is as follows {code:java} Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " +
[jira] [Updated] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28504: -- Description: I have a window topN test task, the code is as follows {code:java} Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); {code} Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png|width=898,height=388! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. {code:java} streamExecutionEnvironment.setParallelism(1); {code} !image-2022-07-12-15-19-29-950.png|width=872,height=384! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. {code:java} st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); {code} result: !image-2022-07-12-15-20-06-919.png|width=866,height=357! To sum up, when the parallelism of Kafka partition and Flink is the same, and idle partitions are generated, I expect to use the configuration'table exec. source. Idle-timeout'to use watermark alignment, but here it seems to fail. was: I have a window topN test task, the code is as follows {code:java} Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + "
[jira] [Updated] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28504: -- Description: I have a window topN test task, the code is as follows {code:java} Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); {code} Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. {code:java} streamExecutionEnvironment.setParallelism(1); {code} !image-2022-07-12-15-19-29-950.png! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. {code:java} st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); {code} result: !image-2022-07-12-15-20-06-919.png! As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time. was: I have a window topN test task, the code is as follows ` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`),
[jira] [Updated] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
[ https://issues.apache.org/jira/browse/FLINK-28504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-28504: -- Description: I have a window topN test task, the code is as follows ` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); ` Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. `streamExecutionEnvironment.setParallelism(1);` !image-2022-07-12-15-19-29-950.png! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. ` st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); ` result: !image-2022-07-12-15-20-06-919.png! As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time. was: I have a window topN test task, the code is as follows ``` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + "
[jira] [Created] (FLINK-28504) Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid
nyingping created FLINK-28504: - Summary: Local-Global aggregation causes watermark alignment (table.exec.source.idle-timeout) of idle partition invalid Key: FLINK-28504 URL: https://issues.apache.org/jira/browse/FLINK-28504 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.14.2 Environment: flink 1.14.1 kafka 2.4 Reporter: nyingping Attachments: image-2022-07-12-15-11-51-653.png, image-2022-07-12-15-19-29-950.png, image-2022-07-12-15-20-06-919.png I have a window topN test task, the code is as follows ``` Configuration configuration = new Configuration(); configuration.setInteger(RestOptions.PORT, 8082); StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration); StreamTableEnvironment st = StreamTableEnvironment.create(streamExecutionEnvironment); st.getConfig().getConfiguration().setString("table.exec.source.idle-timeout", "10s"); st.executeSql( "CREATE TABLE test (\n" + " `key` STRING,\n" + " `time` TIMESTAMP(3),\n" + " `price` float,\n" + " WATERMARK FOR `time` AS `time` - INTERVAL '10' SECOND" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'test',\n" + " 'properties.bootstrap.servers' = 'testlocal:9092',\n" + " 'properties.group.id' = 'windowGroup',\n" + " 'scan.startup.mode' = 'latest-offset',\n" + " 'format' = 'json'\n" + ")" String sqlWindowTopN = "select * from (" + " select *, " + " ROW_NUMBER() over (partition by window_start, window_end order by total desc ) as rownum " + " from (" + " select key,window_start,window_end,count(key) as `count`,sum(price) total from table (" + " tumble(TABLE test, DESCRIPTOR(`time`), interval '1' minute)" + " ) group by window_start, window_end, key" + " )" + ") where rownum <= 3"; st.executeSql(sqlWindowTopN).print(); ``` Run and do not get result on long time after. Watermark appears as follows on the UI !image-2022-07-12-15-11-51-653.png! I didn't set the parallelism manually, so it defaults to 12. The data source Kafka has only one partition, so there are free partitions. To align the watermarks for the entire task, I use the `table.exec. source. Idle-timeout` configuration. As above show,I found that the system automatically split window-Topn SQL into local-global aggregation tasks. In the Local phase, watermark didn't work as well as I expected. Manually setting the parallelism to 1 did what I expected. `streamExecutionEnvironment.setParallelism(1);` !image-2022-07-12-15-19-29-950.png! I can also manually configure the system not to split into local-global phases. At this point, the `table.exec.source-idle-timeout ` configuration takes effect and the watermark is aligned. ` st.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy", "ONE_PHASE"); ` result: !image-2022-07-12-15-20-06-919.png! As mentioned above, I hope to be able to aggregate in two stages and align the watermarks at the same time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping closed FLINK-26086. - Resolution: Fixed > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Assignee: nyingping >Priority: Minor > Labels: pull-request-available, stale-assigned > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > [`@param shiftTimeZone the shit timezone of the > window`|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java#L96] > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-26086: -- Description: such as [`@param shiftTimeZone the shit timezone of the window`|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java#L96] !image-2022-02-11-16-38-12-674.png! was: such as `@param shiftTimeZone the shit timezone of the window` !image-2022-02-11-16-38-12-674.png! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > [`@param shiftTimeZone the shit timezone of the > window`|https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/buffers/WindowBuffer.java#L96] > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086 ] nyingping deleted comment on FLINK-26086: --- was (Author: JIRAUSER282677): Hi [~mapohl] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497108#comment-17497108 ] nyingping commented on FLINK-26086: --- Hi [~trohrmann] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495824#comment-17495824 ] nyingping edited comment on FLINK-26086 at 2/22/22, 7:22 AM: - Hi [~mapohl] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! was (Author: JIRAUSER282677): @[~mapohl] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490908#comment-17490908 ] nyingping edited comment on FLINK-26086 at 2/22/22, 6:46 AM: - Would someone to assign it to me,happy to fix it. thanks in advance. was (Author: JIRAUSER282677): Could someone to assign it to me,happy to fix it. thanks in advance. > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25877) Update the copyright year in NOTICE files
[ https://issues.apache.org/jira/browse/FLINK-25877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490909#comment-17490909 ] nyingping edited comment on FLINK-25877 at 2/22/22, 6:46 AM: - Would someone to assign it to me,happy to fix it. thanks in advance. was (Author: JIRAUSER282677): Could someone to assign it to me,happy to fix it. thanks in advance. > Update the copyright year in NOTICE files > - > > Key: FLINK-25877 > URL: https://issues.apache.org/jira/browse/FLINK-25877 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > > * The current copyright year is 2014-2021 in NOTICE files. We should change > it to 2014-2022. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495824#comment-17495824 ] nyingping edited comment on FLINK-26086 at 2/22/22, 2:21 AM: - @[~mapohl] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! was (Author: JIRAUSER282677): [~mapohl] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495824#comment-17495824 ] nyingping edited comment on FLINK-26086 at 2/22/22, 2:21 AM: - [~mapohl] could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! was (Author: JIRAUSER282677): @Matthias Pohl could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495824#comment-17495824 ] nyingping commented on FLINK-26086: --- @Matthias Pohl could you assign this simple fix to me when you are free? if you think this fix is necessary. Thanks in advance! > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-25877) Update the copyright year in NOTICE files
[ https://issues.apache.org/jira/browse/FLINK-25877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490909#comment-17490909 ] nyingping edited comment on FLINK-25877 at 2/17/22, 6:28 AM: - Could someone to assign it to me,happy to fix it. thanks in advance. was (Author: JIRAUSER282677): Can someone assign it to me,happy to fix it. thanks in advance. > Update the copyright year in NOTICE files > - > > Key: FLINK-25877 > URL: https://issues.apache.org/jira/browse/FLINK-25877 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > > * The current copyright year is 2014-2021 in NOTICE files. We should change > it to 2014-2022. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490908#comment-17490908 ] nyingping edited comment on FLINK-26086 at 2/17/22, 6:28 AM: - Could someone to assign it to me,happy to fix it. thanks in advance. was (Author: JIRAUSER282677): Can someone assign it to me,happy to fix it. thanks in advance. > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-26096) Add isolationLevel for JDBC Connector Options
[ https://issues.apache.org/jira/browse/FLINK-26096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-26096: -- Summary: Add isolationLevel for JDBC Connector Options (was: add isolationLevel for jdbc Connector Options) > Add isolationLevel for JDBC Connector Options > - > > Key: FLINK-26096 > URL: https://issues.apache.org/jira/browse/FLINK-26096 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.14.3 >Reporter: nyingping >Priority: Minor > > The isolation level of JDBC transactions can be particularly useful in some > cases, and it would be nice to have a 'sink-isolation-level' option -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26096) add isolationLevel for jdbc Connector Options
[ https://issues.apache.org/jira/browse/FLINK-26096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17491260#comment-17491260 ] nyingping commented on FLINK-26096: --- I would be happy and honored to implement this feature if the community feels it is necessary. thanks in advance. > add isolationLevel for jdbc Connector Options > - > > Key: FLINK-26096 > URL: https://issues.apache.org/jira/browse/FLINK-26096 > Project: Flink > Issue Type: Improvement > Components: Connectors / JDBC >Affects Versions: 1.14.3 >Reporter: nyingping >Priority: Minor > > The isolation level of JDBC transactions can be particularly useful in some > cases, and it would be nice to have a 'sink-isolation-level' option -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26096) add isolationLevel for jdbc Connector Options
nyingping created FLINK-26096: - Summary: add isolationLevel for jdbc Connector Options Key: FLINK-26096 URL: https://issues.apache.org/jira/browse/FLINK-26096 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.14.3 Reporter: nyingping The isolation level of JDBC transactions can be particularly useful in some cases, and it would be nice to have a 'sink-isolation-level' option -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-25877) Update the copyright year in NOTICE files
[ https://issues.apache.org/jira/browse/FLINK-25877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490909#comment-17490909 ] nyingping commented on FLINK-25877: --- Can someone assign it to me,happy to fix it. thanks in advance. > Update the copyright year in NOTICE files > - > > Key: FLINK-25877 > URL: https://issues.apache.org/jira/browse/FLINK-25877 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > > * The current copyright year is 2014-2021 in NOTICE files. We should change > it to 2014-2022. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
[ https://issues.apache.org/jira/browse/FLINK-26086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490908#comment-17490908 ] nyingping commented on FLINK-26086: --- Can someone assign it to me,happy to fix it. thanks in advance. > fixed some causing ambiguities including the 'shit' comment > --- > > Key: FLINK-26086 > URL: https://issues.apache.org/jira/browse/FLINK-26086 > Project: Flink > Issue Type: Improvement >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > Attachments: image-2022-02-11-16-38-12-674.png > > > such as > `@param shiftTimeZone the shit timezone of the window` > !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-26086) fixed some causing ambiguities including the 'shit' comment
nyingping created FLINK-26086: - Summary: fixed some causing ambiguities including the 'shit' comment Key: FLINK-26086 URL: https://issues.apache.org/jira/browse/FLINK-26086 Project: Flink Issue Type: Improvement Reporter: nyingping Attachments: image-2022-02-11-16-38-12-674.png such as `@param shiftTimeZone the shit timezone of the window` !image-2022-02-11-16-38-12-674.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (FLINK-25877) Update the copyright year in NOTICE files
[ https://issues.apache.org/jira/browse/FLINK-25877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] nyingping updated FLINK-25877: -- Description: * The current copyright year is 2014-2021 in NOTICE files. We should change it to 2014-2022. (was: The current copyright year is 2014-2021 in NOTICE files. We should change it to 2014-2022.) > Update the copyright year in NOTICE files > - > > Key: FLINK-25877 > URL: https://issues.apache.org/jira/browse/FLINK-25877 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: nyingping >Priority: Minor > Labels: pull-request-available > > * The current copyright year is 2014-2021 in NOTICE files. We should change > it to 2014-2022. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25877) Update the copyright year in NOTICE files
nyingping created FLINK-25877: - Summary: Update the copyright year in NOTICE files Key: FLINK-25877 URL: https://issues.apache.org/jira/browse/FLINK-25877 Project: Flink Issue Type: Improvement Components: Documentation Reporter: nyingping The current copyright year is 2014-2021 in NOTICE files. We should change it to 2014-2022. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (FLINK-24456) Support bounded offset in the Kafka table connector
[ https://issues.apache.org/jira/browse/FLINK-24456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17472395#comment-17472395 ] nyingping commented on FLINK-24456: --- Will this feature be released in version 1.15? > Support bounded offset in the Kafka table connector > --- > > Key: FLINK-24456 > URL: https://issues.apache.org/jira/browse/FLINK-24456 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka, Table SQL / Ecosystem >Reporter: Haohui Mai >Assignee: ZhuoYu Chen >Priority: Minor > Labels: pull-request-available > > The {{setBounded}} API in the DataStream connector of Kafka is particularly > useful when writing tests. Unfortunately the table connector of Kafka lacks > the same API. > It would be good to have this API added. -- This message was sent by Atlassian Jira (v8.20.1#820001)