[jira] [Commented] (FLINK-35905) Flink physical operator replacement support
[ https://issues.apache.org/jira/browse/FLINK-35905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869845#comment-17869845 ] xuyang commented on FLINK-35905: Yes, [~au_miner] you're right, currently Flink has not supported a plugin system for custom operators and rules. It is only a future plan... For more details, you can refer to the comments in [{{{}org.apache.flink.table.module.Module{}}}{_}{{}}{_}|https://github.com/apache/flink/blob/82b628d4730eef32b2f7a022e3b73cb18f950e6e/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java#L31] and [FLIP-68|https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Pluggable+Modules]. > Flink physical operator replacement support > --- > > Key: FLINK-35905 > URL: https://issues.apache.org/jira/browse/FLINK-35905 > Project: Flink > Issue Type: Bug > Components: API / Scala, Table SQL / API >Affects Versions: 1.15.0 > Environment: Flink1.15 and so on >Reporter: Wang Qilong >Priority: Major > > Does Flinksql provide some SPI implementations that support custom physical > operators, such as customizing a new StreamExecFileSourceScan and supporting > rule injection for converting logical operators to physical operators -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35905) Flink physical operator replacement support
[ https://issues.apache.org/jira/browse/FLINK-35905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869486#comment-17869486 ] xuyang commented on FLINK-35905: Furthermore, I think this is not a bug, at most an improvement, right? > Flink physical operator replacement support > --- > > Key: FLINK-35905 > URL: https://issues.apache.org/jira/browse/FLINK-35905 > Project: Flink > Issue Type: Bug > Components: API / Scala, Table SQL / API >Affects Versions: 1.15.0 > Environment: Flink1.15 and so on >Reporter: Wang Qilong >Priority: Major > > I have been studying the FlinkSQL source code recently and have learned about > the execution process of FlinkSQL, which has led to a question: > Does Flinksql provide some SPI implementations that support custom physical > operators, such as customizing a FileSourceScanExec in the execPhysicalPlan > layer? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35905) Flink physical operator replacement support
[ https://issues.apache.org/jira/browse/FLINK-35905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869485#comment-17869485 ] xuyang commented on FLINK-35905: A short answer: No. What is the reason and background for you to customize the exec node? Take FileSourceScanExec as an example, it is also just an ExecTableSourceScan with a connector parameter of file. > Flink physical operator replacement support > --- > > Key: FLINK-35905 > URL: https://issues.apache.org/jira/browse/FLINK-35905 > Project: Flink > Issue Type: Bug > Components: API / Scala, Table SQL / API >Affects Versions: 1.15.0 > Environment: Flink1.15 and so on >Reporter: Wang Qilong >Priority: Major > > I have been studying the FlinkSQL source code recently and have learned about > the execution process of FlinkSQL, which has led to a question: > Does Flinksql provide some SPI implementations that support custom physical > operators, such as customizing a FileSourceScanExec in the execPhysicalPlan > layer? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35885) proctime aggregate window triggered by watermark
[ https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868659#comment-17868659 ] xuyang edited comment on FLINK-35885 at 7/25/24 12:24 PM: -- I think this bug needs to be fixed although it only affects the case where watermark is larger than proctime. I will propose a PR to fix it in the next few days. was (Author: xuyangzhong): I think this bug needs to be fixed although it only affects the case where watermark is larger than proctime. Not only window agg is affected, but window operators such as window rank also have this problem. I will propose a PR to fix it in the next few days. > proctime aggregate window triggered by watermark > > > Key: FLINK-35885 > URL: https://issues.apache.org/jira/browse/FLINK-35885 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.17.2 > Environment: flink 1.13.6 with blink or flink 1.17.2 >Reporter: Baozhu Zhao >Priority: Major > > We have discovered an unexpected case where abnormal data with a count of 0 > occurs when performing proctime window aggregation on data with a watermark. > The SQL is as follows > {code:sql} > CREATE TABLE s1 ( > id INT, > event_time TIMESTAMP(3), > name string, > proc_time AS PROCTIME (), > WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > ) > WITH > ('connector' = 'my-source') > ; > SELECT > * > FROM > ( > SELECT > name, > COUNT(id) AS total_count, > window_start, > window_end > FROM > TABLE ( > TUMBLE ( > TABLE s1, > DESCRIPTOR (proc_time), > INTERVAL '30' SECONDS > ) > ) > GROUP BY > window_start, > window_end, > name > ) > WHERE > total_count = 0; > {code} > For detailed test code, please refer to > [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java] > > The root cause is that > https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229 > supports advance progress by watermark. When the watermark suddenly exceeds > the next window end timestamp, a result of count 0 will appear. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (mark.getTimestamp() > currentWatermark) { > windowProcessor.advanceProgress(mark.getTimestamp()); > super.processWatermark(mark); > } else { > super.processWatermark(new Watermark(currentWatermark)); > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35885) proctime aggregate window triggered by watermark
[ https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868659#comment-17868659 ] xuyang commented on FLINK-35885: I think this bug needs to be fixed although it only affects the case where watermark is larger than proctime. Not only window agg is affected, but window operators such as window rank also have this problem. I will propose a PR to fix it in the next few days. > proctime aggregate window triggered by watermark > > > Key: FLINK-35885 > URL: https://issues.apache.org/jira/browse/FLINK-35885 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.17.2 > Environment: flink 1.13.6 with blink or flink 1.17.2 >Reporter: Baozhu Zhao >Priority: Major > > We have discovered an unexpected case where abnormal data with a count of 0 > occurs when performing proctime window aggregation on data with a watermark. > The SQL is as follows > {code:sql} > CREATE TABLE s1 ( > id INT, > event_time TIMESTAMP(3), > name string, > proc_time AS PROCTIME (), > WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > ) > WITH > ('connector' = 'my-source') > ; > SELECT > * > FROM > ( > SELECT > name, > COUNT(id) AS total_count, > window_start, > window_end > FROM > TABLE ( > TUMBLE ( > TABLE s1, > DESCRIPTOR (proc_time), > INTERVAL '30' SECONDS > ) > ) > GROUP BY > window_start, > window_end, > name > ) > WHERE > total_count = 0; > {code} > For detailed test code, please refer to > [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java] > > The root cause is that > https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229 > supports advance progress by watermark. When the watermark suddenly exceeds > the next window end timestamp, a result of count 0 will appear. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (mark.getTimestamp() > currentWatermark) { > windowProcessor.advanceProgress(mark.getTimestamp()); > super.processWatermark(mark); > } else { > super.processWatermark(new Watermark(currentWatermark)); > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35885) proctime aggregate window triggered by watermark
[ https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868658#comment-17868658 ] xuyang commented on FLINK-35885: The bug is caused by the advancement of the watermark and the proctime timer, both of which trigger the advance of the window buffer. When the window buffer advances, it records the time of the last advancement, so that when a new advancement point arrives that is smaller than this time, it does not need to flush the data again but can instead retrieve it directly from the state. The bug occurs when a watermark (wt) that is larger than the proctime (pt) arrives first. This causes the window buffer to advance and flush the data in the buffer to the state. When the proctime is triggered by the timer, the window buffer assumes that the data for this time point (pt) has already been flushed to the state because pt < wt. However, in reality, the data in the state is incomplete (or possibly even empty). > proctime aggregate window triggered by watermark > > > Key: FLINK-35885 > URL: https://issues.apache.org/jira/browse/FLINK-35885 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.13.6, 1.17.2 > Environment: flink 1.13.6 with blink or flink 1.17.2 >Reporter: Baozhu Zhao >Priority: Major > > We have discovered an unexpected case where abnormal data with a count of 0 > occurs when performing proctime window aggregation on data with a watermark. > The SQL is as follows > {code:sql} > CREATE TABLE s1 ( > id INT, > event_time TIMESTAMP(3), > name string, > proc_time AS PROCTIME (), > WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND > ) > WITH > ('connector' = 'my-source') > ; > SELECT > * > FROM > ( > SELECT > name, > COUNT(id) AS total_count, > window_start, > window_end > FROM > TABLE ( > TUMBLE ( > TABLE s1, > DESCRIPTOR (proc_time), > INTERVAL '30' SECONDS > ) > ) > GROUP BY > window_start, > window_end, > name > ) > WHERE > total_count = 0; > {code} > For detailed test code, please refer to > [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java] > > The root cause is that > https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229 > supports advance progress by watermark. When the watermark suddenly exceeds > the next window end timestamp, a result of count 0 will appear. > {code:java} > public void processWatermark(Watermark mark) throws Exception { > if (mark.getTimestamp() > currentWatermark) { > windowProcessor.advanceProgress(mark.getTimestamp()); > super.processWatermark(mark); > } else { > super.processWatermark(new Watermark(currentWatermark)); > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35861) The dependency of kafka sql connector seems wrong
[ https://issues.apache.org/jira/browse/FLINK-35861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866923#comment-17866923 ] xuyang commented on FLINK-35861: I'm not sure if other connectors have this bugs in document. cc [~loserwang1024]. > The dependency of kafka sql connector seems wrong > - > > Key: FLINK-35861 > URL: https://issues.apache.org/jira/browse/FLINK-35861 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Documentation >Affects Versions: 1.18.1, 1.19.1 >Reporter: xuyang >Priority: Minor > > The dependency of it should be `flink-sql-connector-kafka` instead of > `flink-connector-kafka`, right? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35861) The dependency of kafka sql connector seems wrong
xuyang created FLINK-35861: -- Summary: The dependency of kafka sql connector seems wrong Key: FLINK-35861 URL: https://issues.apache.org/jira/browse/FLINK-35861 Project: Flink Issue Type: Bug Components: Connectors / Kafka, Documentation Affects Versions: 1.19.1, 1.18.1 Reporter: xuyang The dependency of it should be `flink-sql-connector-kafka` instead of `flink-connector-kafka`, right? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.
[ https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866220#comment-17866220 ] xuyang commented on FLINK-35826: I believe this is a more general problem, that is "how does an operator determine if a piece of data should be considered as late data". Currently, we determine it on an operator level by comparing with the minimum of multiple input watermarks, but in a distributed system, the watermarks from multiple upstream operators or multiple parallelism of one upstream operator might be delayed. One speculative possibility: could the data discarding be unified in WatermarkAssigner? > [SQL] Sliding window may produce unstable calculations when processing > changelog data. > -- > > Key: FLINK-35826 > URL: https://issues.apache.org/jira/browse/FLINK-35826 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 > Environment: flink with release-1.20 >Reporter: Yuan Kui >Assignee: xuyang >Priority: Major > Attachments: image-2024-07-12-14-27-58-061.png > > > Calculation results may be unstable when using a sliding window to process > changelog data. Repeat the execution 10 times, the test results are partial > success and partial failure: > !image-2024-07-12-14-27-58-061.png! > See the documentation and code for more details. > [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing] > code: > [[BUG] Reproduce the issue of unstable sliding window calculation results · > yuchengxin/flink@c003e45 > (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.
[ https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866217#comment-17866217 ] xuyang commented on FLINK-35826: cc [~catyee] [~yunta] > [SQL] Sliding window may produce unstable calculations when processing > changelog data. > -- > > Key: FLINK-35826 > URL: https://issues.apache.org/jira/browse/FLINK-35826 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 > Environment: flink with release-1.20 >Reporter: Yuan Kui >Assignee: xuyang >Priority: Major > Attachments: image-2024-07-12-14-27-58-061.png > > > Calculation results may be unstable when using a sliding window to process > changelog data. Repeat the execution 10 times, the test results are partial > success and partial failure: > !image-2024-07-12-14-27-58-061.png! > See the documentation and code for more details. > [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing] > code: > [[BUG] Reproduce the issue of unstable sliding window calculation results · > yuchengxin/flink@c003e45 > (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.
[ https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866216#comment-17866216 ] xuyang commented on FLINK-35826: Hi, let me try to explain the cause for this problem. I have reproduced this problem by the following source data and query: https://github.com/xuyangzhong/flink/tree/window_bug {code:java} // source data // ("+I", 1, 1, "2024-03-13T10:12", "1"), ("+I", 2, 3, "2024-03-13T10:16", "1"), ("+I", 3, 2, "2024-03-13T10:13", "1") // schema CREATE TABLE MyTable3 ( id int, amount int, create_time timestamp(3), pt string, proc_time AS PROCTIME(), WATERMARK FOR `create_time` AS `create_time` - INTERVAL '0' MINUTES, PRIMARY KEY (id) NOT ENFORCED ) WITH (...) // query select pt, HOP_START(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS w_start, HOP_END(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS w_end, sum(amount) as count_age from MyTable3 group by HOP(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES), pt; // expected result: "1,2024-03-13T10:05,2024-03-13T10:15,1", "1,2024-03-13T10:10,2024-03-13T10:20,6", "1,2024-03-13T10:15,2024-03-13T10:25,3" // but sometimes wrong: "1,2024-03-13T10:05,2024-03-13T10:15,3", "1,2024-03-13T10:10,2024-03-13T10:20,6", "1,2024-03-13T10:15,2024-03-13T10:25,3" {code} {color:#FF}*Conclusion:*{color} The bug is caused by that the data `("+I", 3, 2, "2024-03-13T10:13", "1")` sometimes is not treated as a late record because of the watermark `create_time - INTERVAL '0' MINUTES`. *Detailed cause:* The plan: {code:java} parallelism is 4 (t1 means subtask-1)source assigner(t1) -> changelogNormalize (t1) -> group window agg(t1) -> changelogNormalize (t2) -> group window agg(t2) -> changelogNormalize (t3) -> group window agg(t3) -> changelogNormalize (t4) -> group window agg(t4) {code} Data accross changelogNormalize: Data is simplified by , and Watermark is simplified to minutes. | |t1|t2|t3|t4| |data|<1,1>|-|-|-| |watermark|12|12|12|12| |data|-|<2, 3>|-|-| |watermark|16|16{color:#FF}[1]{color}|16|16| |data|-|-|<3, 2>|-| |watermark|Long.MAX|Long.MAX|Long.MAX|Long.MAX| [1]If we block the processing of watermark in ChangelogNormalize [1] for a little time, <3, 2> will not been treated as a late record because the timestamp <3, 2> is 13(less than the watermark 12 in GroupWindowAgg ) Wrong data process in GroupWindowAgg: |data or watermark|window's acc|output| |<1, 1> // t1|[10, 15) : 1|-| |Watermark 12 // t1, t2, t3, t4|[10, 15): 1|-| |<2, 3> // t2|[10, 15): 1 [15, 20): 3|-| |Watermark 16 // t1, t3, t4|[10, 15): 1 [15, 20): 3|-| |<3, 2> // t3|[10, 15): 3 [15, 20): 3|-| |{color:#FF}Watermark 16 // t2 (blocked){color}|[10, 15): 3 [15, 20): 3|[5, 15): 3| |Watermark Long.MAX// t1,t2,t3, t4| |[10, 20): 6 [15, 25): 3| > [SQL] Sliding window may produce unstable calculations when processing > changelog data. > -- > > Key: FLINK-35826 > URL: https://issues.apache.org/jira/browse/FLINK-35826 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 > Environment: flink with release-1.20 >Reporter: Yuan Kui >Assignee: xuyang >Priority: Major > Attachments: image-2024-07-12-14-27-58-061.png > > > Calculation results may be unstable when using a sliding window to process > changelog data. Repeat the execution 10 times, the test results are partial > success and partial failure: > !image-2024-07-12-14-27-58-061.png! > See the documentation and code for more details. > [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing] > code: > [[BUG] Reproduce the issue of unstable sliding window calculation results · > yuchengxin/flink@c003e45 > (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.
[ https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865412#comment-17865412 ] xuyang commented on FLINK-35826: [~martijnvisser] Thanks for this ping. I'll mark it and try to resolve it these days. > [SQL] Sliding window may produce unstable calculations when processing > changelog data. > -- > > Key: FLINK-35826 > URL: https://issues.apache.org/jira/browse/FLINK-35826 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 > Environment: flink with release-1.20 >Reporter: Yuan Kui >Priority: Major > Attachments: image-2024-07-12-14-27-58-061.png > > > Calculation results may be unstable when using a sliding window to process > changelog data. The test results are partial success and partial failure: > !image-2024-07-12-14-27-58-061.png! > See the documentation and code for more details. > [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing] > code: > [[BUG] Reproduce the issue of unstable sliding window calculation results · > yuchengxin/flink@c003e45 > (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35792) Sorting by proctime does not work in rank
[ https://issues.apache.org/jira/browse/FLINK-35792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865375#comment-17865375 ] xuyang commented on FLINK-35792: I'll try to fix it. > Sorting by proctime does not work in rank > - > > Key: FLINK-35792 > URL: https://issues.apache.org/jira/browse/FLINK-35792 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 1.19.1 >Reporter: xuyang >Priority: Major > > Take the following sql as an example: > {code:java} > @Test > def test(): Unit = { > val sql = > """ > |SELECT * > |FROM ( > | SELECT a, b, c, > | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as > rank_num > | FROM MyTable) > |WHERE rank_num = 1 > """.stripMargin > // This rank can't be converted into Deduplicated because it also uses `b` > > // as order key. > util.verifyExecPlan(sql) > } {code} > The rank node will not materialize the `proctime` in > `RelTimeIndicatorConverter`, thus the order key `proctime` is always null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35829) StreamPhysicalWindowTableFunction doesn't always require watermark
xuyang created FLINK-35829: -- Summary: StreamPhysicalWindowTableFunction doesn't always require watermark Key: FLINK-35829 URL: https://issues.apache.org/jira/browse/FLINK-35829 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.19.1, 1.20.0 Reporter: xuyang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35782) Flink connector jdbc works wrong when using sql gateway
[ https://issues.apache.org/jira/browse/FLINK-35782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864865#comment-17864865 ] xuyang commented on FLINK-35782: Hi, [~caiyi]. SqlGateway does not provide a driver by default. Can you confirm whether the corresponding drivers are included in the packages of these two connectors? If not, you can add the driver to SqlGateway by `add jar ...` and try again. > Flink connector jdbc works wrong when using sql gateway > --- > > Key: FLINK-35782 > URL: https://issues.apache.org/jira/browse/FLINK-35782 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / Gateway >Affects Versions: 1.18.1, 1.19.1 >Reporter: Yi Cai >Priority: Major > > When using sql clent to submit jobs to sql gateway will cause no suitable > driver found for x > > script: > add jar 's3://flink/lib/flink-connector-jdbc-3.1.2-1.19.jar'; > add jar 's3://flink/lib/mysql-connector-j-8.0.33.jar'; > CREATE CATALOG xxx WITH( > ... > ); > select ... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35792) Sorting by proctime does not work in rank
xuyang created FLINK-35792: -- Summary: Sorting by proctime does not work in rank Key: FLINK-35792 URL: https://issues.apache.org/jira/browse/FLINK-35792 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0, 1.20.0 Reporter: xuyang Take the following sql as an example: {code:java} @Test def test(): Unit = { val sql = """ |SELECT * |FROM ( | SELECT a, b, c, | ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as rank_num | FROM MyTable) |WHERE rank_num = 1 """.stripMargin // This rank can't be converted into Deduplicated because it also uses `b` // as order key. util.verifyExecPlan(sql) } {code} The rank node will not materialize the `proctime` in `RelTimeIndicatorConverter`, thus the order key `proctime` is always null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860506#comment-17860506 ] xuyang edited comment on FLINK-20539 at 6/27/24 2:47 PM: - 1. It looks like the original fix didn't fix it cleanly on the table api. Let me try to explain why the latest query failed briefly: The type of the *cast* in the first *sqlQuery* is the wrong {*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary table through {*}createTemporaryView{*}, it is converted to flink's own type {*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to *PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is no longer consistent with the type of the original calcite tree. ({*}Row with {color:#ff}*FULLY_QUALIFIED in calcite*{color} -> RowType in flink -> Row with{*} {color:#ff}*PEEK_FIELDS_NO_EXPAND in calcite*{color}{*}{{*}}) _Too detailed to read:_ After executing sqlQuery, the *Row* type about *CAST* in the query statement has become the wrong FULLY_QUALIFIED. However, when executing {*}createTemporaryView{*}, we put the calcite tree into {*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the *ResolvedSchema* in Flink, and store it in the catalog manager as a temporary table (i.e. {*}t1{*}). When executing *sqlQuery* again, we need to convert the ResolvedSchema of the *t1* table into a type that can be recognized by calcite ({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type in the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug. 2. By the way, I tried the following query and found that there was no error, but there was a slight problem with the plan. (Although the same type of ITCase did not report an error) {code:java} @Test def test(): Unit = { util.addTable(s""" |create table t1( | a int, | b varchar |) with ( | 'connector' = 'datagen' |) """.stripMargin) util.verifyExecPlan( "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col FROM t1") } // actual wrong plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]) // expected correct plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]){code} Now I have determined the cause of the problem and how to fix it, and I am adding some cases and will create a pr later. Due to the inconsistency of row types in Calcite and Flink, I cannot enumerate all possible error cases in the future. If there are queries with the same error in the future, anyone can link to this jira and I will solve it then. was (Author: xuyangzhong): 1. It looks like the original fix didn't fix it cleanly on the table api. Let me try to explain why the latest query failed briefly: The type of the *cast* in the first *sqlQuery* is the wrong {*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary table through {*}createTemporaryView{*}, it is converted to flink's own type {*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to *PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is no longer consistent with the type of the original calcite tree. ({*}Row with {color:#FF}*FULLY_QUALIFIED in calcite*{color} -> RowType in flink -> Row with{*} {color:#FF}*PEEK_FIELDS_NO_EXPAND in calcite*{color}{*}{*}) _Too detailed to read:_ After executing sqlQuery, the *Row* type about *CAST* in the query statement has become the wrong FULLY_QUALIFIED. However, when executing {*}createTemporaryView{*}, we put the calcite tree into {*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the *ResolvedSchema* in Flink, and store it in the catalog manager as a temporary table (i.e. {*}t1{*}). When executing *sqlQuery* again, we need to convert the ResolvedSchema of the *t1* table into a type that can be recognized by calcite ({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type in the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the
[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860506#comment-17860506 ] xuyang commented on FLINK-20539: 1. It looks like the original fix didn't fix it cleanly on the table api. Let me try to explain why the latest query failed briefly: The type of the *cast* in the first *sqlQuery* is the wrong {*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary table through {*}createTemporaryView{*}, it is converted to flink's own type {*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to *PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is no longer consistent with the type of the original calcite tree. ({*}Row with {color:#FF}*FULLY_QUALIFIED in calcite*{color} -> RowType in flink -> Row with{*} {color:#FF}*PEEK_FIELDS_NO_EXPAND in calcite*{color}{*}{*}) _Too detailed to read:_ After executing sqlQuery, the *Row* type about *CAST* in the query statement has become the wrong FULLY_QUALIFIED. However, when executing {*}createTemporaryView{*}, we put the calcite tree into {*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the *ResolvedSchema* in Flink, and store it in the catalog manager as a temporary table (i.e. {*}t1{*}). When executing *sqlQuery* again, we need to convert the ResolvedSchema of the *t1* table into a type that can be recognized by calcite ({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type in the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug. 2. By the way, I tried the following query and found that there was no error, but there was a slight problem with the plan. (Although the same type of ITCase did not report an error) {code:java} @Test def test(): Unit = { util.addTable(s""" |create table t1( | a int, | b varchar, | c as row(a, b) |) with ( | 'connector' = 'datagen' |) """.stripMargin) util.verifyExecPlan( "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col FROM t1") } // actual wrong plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]) // expected correct plan Calc(select=[a, b, CAST(ROW(a, b) AS RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) b_val)) AS col]) +- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]){code} Now I have determined the cause of the problem and how to fix it, and I am adding some cases and will create a pr later. Due to the inconsistency of row types in Calcite and Flink, I cannot enumerate all possible error cases in the future. If there are queries with the same error in the future, anyone can link to this jira and I will solve it then. > Type mismatch when using ROW in computed column > --- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: xuyang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + "order_number BIGINT,\n" > + "priceINT,\n" > + "first_name STRING,\n" > + "last_nameSTRING,\n" > + "buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name,
[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column
[ https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859708#comment-17859708 ] xuyang commented on FLINK-20539: I'll take a look recently. > Type mismatch when using ROW in computed column > --- > > Key: FLINK-20539 > URL: https://issues.apache.org/jira/browse/FLINK-20539 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Timo Walther >Assignee: xuyang >Priority: Major > Labels: auto-unassigned, pull-request-available > Fix For: 1.19.0, 1.18.2 > > > The following SQL: > {code} > env.executeSql( > "CREATE TABLE Orders (\n" > + "order_number BIGINT,\n" > + "priceINT,\n" > + "first_name STRING,\n" > + "last_nameSTRING,\n" > + "buyer_name AS ROW(first_name, last_name)\n" > + ") WITH (\n" > + " 'connector' = 'datagen'\n" > + ")"); > env.executeSql("SELECT * FROM Orders").print(); > {code} > Fails with: > {code} > Exception in thread "main" java.lang.AssertionError: Conversion to relational > algebra failed to preserve datatypes: > validated type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET > "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT > NULL buyer_name) NOT NULL > converted type: > RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER > SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" > last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, > VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT > NULL > rel: > LogicalProject(order_number=[$0], price=[$1], first_name=[$2], > last_name=[$3], buyer_name=[ROW($2, $3)]) > LogicalTableScan(table=[[default_catalog, default_database, Orders]]) > at > org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35498) Unexpected argument name conflict error when do extract method params from udf
[ https://issues.apache.org/jira/browse/FLINK-35498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852333#comment-17852333 ] xuyang commented on FLINK-35498: Copied from this available pr: Currently, we depend on asm to extract method parameter names. We custom a MethodVisitor to visit local variable tables in file `{{{}.class`{}}}, and then cut the first `{{{}N`{}}} local variable names as the method parameter names. However, if there are multi blocks about one local variable, the first `{{{}N`{}}} local variable names could be same, and then wrongly be extracted, and crashed when validating the conflict of them(the new logic added in 1.19). This pr will use the slot index in `{{{}.class`{}}} file to extract the method parameter names, because method parameter names are always at the head in the 'slot index' list([Chapter 3.6. Receiving Arguments|https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html]) Take the test function `{{{}ExtractionUtilsTest#MultiLocalVariableBlocksWithoutInitializationClass`{}}} as an example: * Before fix: # extract parameter names from asm: {{`[localVariable, localVariable, localVariable, this, generic, genericFuture, listOfGenericFuture, array, localVariable]`}} # get the first 4 after offset 1(expected {{this}} in index 0) {{`[localVariable, localVariable, this, generic]`}} This is because in local variable table, there are multi {{localVariable}} with different lifecycle in different blocks. * After fix: # extract parameter names from asm: {{`[this, generic, genericFuture, listOfGenericFuture, array, localVariable]`}} # get the first 4 after offset 1(expected {{this}} in index 0) {{`[generic, genericFuture, listOfGenericFuture, array]`}} That's what we expected. Further more, if the local variable has been initialized before `{{{}if statement`{}}}, its lifecycle is across all `{{{}if`{}}} blocks, and there is no need to init new local variable {{localVariable}} for it. Otherwize, `{{{}localVariable`{}}} in multi blocks are actually not visible although it is declared at first, and jvm will create multi `{{{}localVariable`{}}} for different blocks and add it in local variable table(You can see in local variable table, although these `{{{}localVariable{}}}`'s slots are same, but their start label and end label are different). > Unexpected argument name conflict error when do extract method params from udf > -- > > Key: FLINK-35498 > URL: https://issues.apache.org/jira/browse/FLINK-35498 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0, 1.20.0 >Reporter: lincoln lee >Assignee: xuyang >Priority: Major > Labels: pull-request-available > Attachments: image-2024-06-02-23-09-17-768.png > > > Follow the steps to reproduce the error: > test case: > {code:java} > util.addTemporarySystemFunction("myudf", new TestXyz) > util.tableEnv.explainSql("select myudf(f1, f2) from t") > {code} > > udf: TestXyz > {code:java} > public class TestXyz extends ScalarFunction { > public String eval(String s1, String s2) { > // will not fail if add initialization > String localV1; > if (s1 == null) { > if (s2 != null) { > localV1 = s2; > } else { > localV1 = s2 + s1; > } > } else { > if ("xx".equals(s2)) { > localV1 = s1.length() >= s2.length() ? s1 : s2; > } else { > localV1 = s1; > } > } > if (s1 == null) { > return s2 + localV1; > } > if (s2 == null) { > return s1; > } > return s1.length() >= s2.length() ? s1 + localV1 : s2; > } > } > {code} > > error stack: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Unable to extract > a type inference from method: > public java.lang.String > org.apache.flink.table.planner.runtime.utils.TestXyz.eval(java.lang.String,java.lang.String) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.BaseMappingExtractor.extractResultMappings(BaseMappingExtractor.java:154) > at > org.apache.flink.table.types.extraction.BaseMappingExtractor.extractOutputMapping(BaseMappingExtractor.java:100) > ... 53 more > Caused by: org.apache.flink.table.api.ValidationException: Argument name > conflict, there are at least two argument names that are the same. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357) > at > org.apache.flink.table.types.extraction.FunctionSignatureTemplate.of(FunctionSignatureTemplate.java:73) > at >
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849209#comment-17849209 ] xuyang commented on FLINK-34380: Hi, [~rovboyko] . +1 to fix the wrong order. I'll take a look for your pr later. > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846485#comment-17846485 ] xuyang edited comment on FLINK-34380 at 5/15/24 4:01 AM: - Sorry for this late reply. The results of this repair still don’t seem to meet expectations a little. Still based on the above test, the result is following. However, the row kind of the first data should be `+I`, right? {code:java} ++---++---++ | op | a | b | a0| b0 | ++---++---++ | +U | 1 | 1 | 1 | 99 | | -U | 1 | 1 | 1 | 99 | | +U | 1 | 99 | 1 | 99 | | -D | 1 | 99 | 1 | 99 | ++---++---++ {code} was (Author: xuyangzhong): Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can you take a look to verify it again? > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846485#comment-17846485 ] xuyang commented on FLINK-34380: Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can you take a look to verify it again? > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.20.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35327) SQL Explain show push down condition
[ https://issues.apache.org/jira/browse/FLINK-35327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845189#comment-17845189 ] xuyang commented on FLINK-35327: Hi, I'd like to take this Jira and take a look at it. > SQL Explain show push down condition > - > > Key: FLINK-35327 > URL: https://issues.apache.org/jira/browse/FLINK-35327 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: Hongshun Wang >Priority: Minor > Fix For: 1.20.0 > > > Current, we can not determine whether filter/limit/partition condition is > pushed down to source. For example, we can only know filter condition is > pushed down if it is not included in Filter any more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode
[ https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844083#comment-17844083 ] xuyang commented on FLINK-35193: Hi, [~lsy] I'd like to take this task. > Support drop materialized table syntax and execution in continuous refresh > mode > --- > > Key: FLINK-35193 > URL: https://issues.apache.org/jira/browse/FLINK-35193 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Priority: Major > Fix For: 1.20.0 > > > In continuous refresh mode, support drop materialized table and the > background refresh job. > {code:SQL} > DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35198) Support the execution of refresh materialized table
[ https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844082#comment-17844082 ] xuyang commented on FLINK-35198: Hi, [~lsy] Can I take this jira? > Support the execution of refresh materialized table > --- > > Key: FLINK-35198 > URL: https://issues.apache.org/jira/browse/FLINK-35198 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Priority: Major > Fix For: 1.20.0 > > > {code:SQL} > ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH > [PARTITION (key1=val1, key2=val2, ...)] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35271) Add doc for syntax `describe job 'xxx'`
xuyang created FLINK-35271: -- Summary: Add doc for syntax `describe job 'xxx'` Key: FLINK-35271 URL: https://issues.apache.org/jira/browse/FLINK-35271 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.20.0 Reporter: xuyang Fix For: 1.20.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35194) Support describe job syntax and execution
[ https://issues.apache.org/jira/browse/FLINK-35194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841113#comment-17841113 ] xuyang commented on FLINK-35194: Hi, can I take this jira? > Support describe job syntax and execution > - > > Key: FLINK-35194 > URL: https://issues.apache.org/jira/browse/FLINK-35194 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.20.0 >Reporter: dalongliu >Priority: Major > Fix For: 1.20.0 > > > {code:java} > { DESCRIBE | DESC } JOB 'xxx' > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35081) CompileException when watermark definition contains coalesce and to_timestamp built-in functions
[ https://issues.apache.org/jira/browse/FLINK-35081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838424#comment-17838424 ] xuyang commented on FLINK-35081: I think this bug is same with FLINK-28693 > CompileException when watermark definition contains coalesce and to_timestamp > built-in functions > > > Key: FLINK-35081 > URL: https://issues.apache.org/jira/browse/FLINK-35081 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.17.1 >Reporter: Grzegorz Kołakowski >Priority: Major > > I have a data stream in which event-time column can have two data formats. To > be able to define watermark on the table, I used coalesce and to_timestamp > built-in functions as shown below: > {code:sql} > create table test ( > `@timestamp` VARCHAR, > __rowtime AS coalesce( > to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss'), > to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss.SSS') > ), > watermark for __rowtime as __rowtime - INTERVAL '30' SECOND, > ... > ) with ( ... ) > {code} > The job failed with the following stacktrace: > {noformat} > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at akka.actor.Actor.aroundReceive(Actor.scala:537) > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) > at akka.actor.ActorCell.invoke(ActorCell.scala:547) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) > at > java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) > at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) > at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) > Caused
[jira] [Commented] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs
[ https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838130#comment-17838130 ] xuyang commented on FLINK-34583: Hi, [~xccui] can you provide more details about this bug? I try to run this test in my local env with Flink 1.18-SNAPSHOT, but could not re-produce it. {code:java} // run it in org.apache.flink.table.planner.plan.stream.sql.CalcTest @Test def test(): Unit = { util.tableEnv.executeSql(s""" |create temporary table T1 ( | a int, | b int, | c int) | with ( 'connector' = 'values' ) |""".stripMargin) util.verifyExecPlan( "with q1 as (SELECT * FROM T1 /*+ OPTIONS('changelog-mode' = 'I,D') */ WHERE a > 10)," + "q2 as (SELECT a, b, c FROM q1 where b > 10)," + "q3 as (select a,b,c from q1 where c > 20)," + "q4 as (select * from q2 join q3 on q2.a = q3.a) SELECT * FROM q4"); } // result Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, c, a0, b0, c0], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, c], where=[((a > 10) AND (b > 10))]) : +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[]]], fields=[a, b, c], hints=[[[OPTIONS options:{changelog-mode=I,D}]]])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, c], where=[((a > 10) AND (c > 20))]) +- Reused(reference_id=[1]){code} > Bug for dynamic table option hints with multiple CTEs > - > > Key: FLINK-34583 > URL: https://issues.apache.org/jira/browse/FLINK-34583 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > > The table options hints don't work well with multiple WITH clauses referring > to the same table. Please see the following example. > > The following query with hints works well. > {code:java} > SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code} > The following query with multiple WITH clauses also works well. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...) > SELECT * FROM T3;{code} > The following query with multiple WITH clauses referring to the same original > table failed to recognize the hints. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...), > T4 AS (SELECT ... FROM T2 WHERE...), > T5 AS (SELECT ... FROM T3 JOIN T4 ON...) > SELECT * FROM T5;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs
[ https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823973#comment-17823973 ] xuyang commented on FLINK-34583: [~lincoln.86xy] Sure. > Bug for dynamic table option hints with multiple CTEs > - > > Key: FLINK-34583 > URL: https://issues.apache.org/jira/browse/FLINK-34583 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.1 >Reporter: Xingcan Cui >Priority: Major > > The table options hints don't work well with multiple WITH clauses referring > to the same table. Please see the following example. > > The following query with hints works well. > {code:java} > SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code} > The following query with multiple WITH clauses also works well. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...) > SELECT * FROM T3;{code} > The following query with multiple WITH clauses referring to the same original > table failed to recognize the hints. > {code:java} > WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...), > T3 AS (SELECT ... FROM T2 WHERE...), > T4 AS (SELECT ... FROM T2 WHERE...), > T5 AS (SELECT ... FROM T3 JOIN T4 ON...) > SELECT * FROM T5;{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34559) TVF Window Aggregations might get stuck
[ https://issues.apache.org/jira/browse/FLINK-34559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823012#comment-17823012 ] xuyang commented on FLINK-34559: Hi, [~roman], I'm a little interested in how you would solve this problem. > limit the amount of data buffered in Global Aggregation nodes IIUC, for non-session global window aggregation, it only stores the aggregated results in the state. I am a little curious about what buffering the data means here(maybe you mean input network buffer in operator level, right?). > disable two-phase aggregations Even if two-stage optimization is disabled, if the user's watermark interval is set very long, or the window is set very large, then in the global window aggregation node, the state will still be updated frequently. As you mentioned, I believe it is now difficult to relate the availability of network buffers to the optimization of the plan, such as two-phase optimization. So, I'm somewhat looking forward to seeing your solution. :) > TVF Window Aggregations might get stuck > --- > > Key: FLINK-34559 > URL: https://issues.apache.org/jira/browse/FLINK-34559 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0, 1.18.1 >Reporter: Roman Khachatryan >Assignee: Roman Khachatryan >Priority: Major > Fix For: 1.19.0 > > > RecordsWindowBuffer flushes buffered records in the following cases: > * watermark > * checkpoint barrier > * buffer overflow > > In two-phase aggregations, this creates the following problems: > 1) Local aggregation: enters hard-backpressure because for flush, it outputs > the data downstream and doesn't check network buffer availability > This already disrupts normal checkpointing and watermarks progression > > 2) Global aggregation: > When the window is large enough and/or the watermark is lagging, lots of data > is flushed to state backend (and the state is updated) in checkpoint SYNC > phase. > > All this eventually causes checkpoint timeouts (10 minutes in our env). > > Example query > {code:java} > INSERT INTO `target_table` > SELECT window_start, window_end, some, attributes, SUM(view_time) AS > total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages > FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' > HOUR)) > GROUP BY window_start, window_end, some, attributes;{code} > In our setup, the issue can be reproduced deterministically. > > As a quick fix, we might want to: > # limit the amount of data buffered in Global Aggregation nodes > # disable two-phase aggregations, i.e. Local Aggregations (we can try to > limit buffing there two, but network buffer availability can not be easily > checked from the operator) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
[ https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822401#comment-17822401 ] xuyang commented on FLINK-34380: Hi, [~xu_shuai_] . Can you help check it again? > Strange RowKind and records about intermediate output when using minibatch > join > --- > > Key: FLINK-34380 > URL: https://issues.apache.org/jira/browse/FLINK-34380 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > // Add it in CalcItCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > changelogRow("+I", java.lang.Integer.valueOf(1), "1"), > changelogRow("-U", java.lang.Integer.valueOf(1), "1"), > changelogRow("+U", java.lang.Integer.valueOf(1), "99"), > changelogRow("-D", java.lang.Integer.valueOf(1), "99") > ) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" > |CREATE TABLE t1 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" > |CREATE TABLE t2 ( > | a int, > | b string > |) WITH ( > | 'connector' = 'values', > | 'data-id' = '$dataId', > | 'bounded' = 'false' > |) >""".stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = > t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > } {code} > Output: > {code:java} > ++-+-+-+-+ > | op | a | b | a0 | b0 | > ++-+-+-+-+ > | +U | 1 | 1 | 1 | 99 | > | +U | 1 | 99 | 1 | 99 | > | -U | 1 | 1 | 1 | 99 | > | -D | 1 | 99 | 1 | 99 | > ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector
[ https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822337#comment-17822337 ] xuyang commented on FLINK-33989: Agree with [~libenchao] . This is a behavior by design. > Insert Statement With Filter Operation Generates Extra Tombstone using Upsert > Kafka Connector > - > > Key: FLINK-33989 > URL: https://issues.apache.org/jira/browse/FLINK-33989 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Table SQL / Runtime >Affects Versions: 1.17.2 >Reporter: Flaviu Cicio >Priority: Major > > Given the following Flink SQL tables: > {code:sql} > CREATE TABLE input ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'input', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); > CREATE TABLE output ( > id STRING NOT NULL, > current_value STRING NOT NULL, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'output', > 'key.format' = 'raw', > 'properties.bootstrap.servers' = 'kafka:29092', > 'properties.group.id' = 'your_group_id', > 'value.format' = 'json' > ); {code} > And, the following entries are present in the input Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > If we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input; {code} > The following entries are published to the output Kafka topic: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > But, if we execute the following statement: > {code:sql} > INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); > {code} > The following entries are published: > {code:json} > [ > { > "id": "1", > "current_value": "abc" > }, > null, > { > "id": "1", > "current_value": "abcd" > } > ]{code} > We would expect the result to be the same for both insert statements. > As we can see, there is an extra tombstone generated as a result of the > second statement. > > Moreover, if we make a select on the input table: > {code:sql} > SELECT * FROM input; > {code} > We will get the following entries: > ||op||id||current_value|| > |I|1|abc| > |-U|1|abc| > |+U|1|abcd| > We expected to see only the insert and the update_after entries. > The update_before is added at DeduplicateFunctionHelper#122. > This is easily reproducible with this test that we added in the > UpsertKafkaTableITCase from flink-connector-kafka: > {code:java} > @Test > public void testAggregateFilterOmit() throws Exception { > String topic = COUNT_FILTER_TOPIC + "_" + format; > createTestTopic(topic, 1, 1); > env.setParallelism(1); > // - test --- > countFilterToUpsertKafkaOmitUpdateBefore(topic); > // - clean up --- > deleteTestTopic(topic); > } > private void countFilterToUpsertKafkaOmitUpdateBefore(String table) > throws Exception { > String bootstraps = getBootstrapServers(); > List data = > Arrays.asList( > Row.of(1, "Hi"), > Row.of(1, "Hello"), > Row.of(2, "Hello world"), > Row.of(2, "Hello world, how are you?"), > Row.of(2, "I am fine."), > Row.of(3, "Luke Skywalker"), > Row.of(3, "Comment#1"), > Row.of(3, "Comment#2"), > Row.of(4, "Comment#3"), > Row.of(4, null)); > final String createSource = > String.format( > "CREATE TABLE aggfilter_%s (" > + " `id` INT,\n" > + " `comment` STRING\n" > + ") WITH (" > + " 'connector' = 'values'," > + " 'data-id' = '%s'" > + ")", > format, TestValuesTableFactory.registerData(data)); > tEnv.executeSql(createSource); > final String createSinkTable = > String.format( > "CREATE TABLE %s (\n" > + " `id` INT,\n" > + " `comment`
[jira] [Commented] (FLINK-34016) Janino compile failed when watermark with column by udf
[ https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822088#comment-17822088 ] xuyang commented on FLINK-34016: Thanks for your test! [~seb-pereira] (y) > Janino compile failed when watermark with column by udf > --- > > Key: FLINK-34016 > URL: https://issues.apache.org/jira/browse/FLINK-34016 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0, 1.18.0 >Reporter: ude >Priority: Major > Labels: pull-request-available > Attachments: image-2024-01-25-11-53-06-158.png, > image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, > image-2024-01-25-12-57-34-632.png > > > After submit the following flink sql by sql-client.sh will throw an exception: > {code:java} > Caused by: java.lang.RuntimeException: Could not instantiate generated class > 'WatermarkGenerator$0' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69) > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109) > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ... 16 more > Caused by: > org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 18 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159) > at >
[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.
[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822064#comment-17822064 ] xuyang commented on FLINK-34529: Hi, [~nilerzhou] . The plan to fix bug LGTM. IIUC, the fix is that when doing program about `LOGICAL_REWRITE`, the calc transpose the rank node by `CalcRankTransposeRule`. However, we don't have a rule to let the calc transpose the join node. The rule that lets calc transpose join is only in `PROJECT_REWRITE` and `LOGICAL`. One way to fix this bug is what you said, to add the rule `ProjectWindowTransposeRule` in `LOGICAL`. Another way to fix this bug is to add a rule like `CalcJoinTransposeRule` in `LOGICAL_REWRITE`. cc [~libenchao] > Projection cannot be pushed down through rank operator. > --- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: yisha zhou >Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) >+- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) >+- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34473) Migrate FlinkPruneEmptyRules
[ https://issues.apache.org/jira/browse/FLINK-34473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821633#comment-17821633 ] xuyang commented on FLINK-34473: Hi, [~jackylau] . This Jira is a subtask instead of a bug, right? > Migrate FlinkPruneEmptyRules > > > Key: FLINK-34473 > URL: https://issues.apache.org/jira/browse/FLINK-34473 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0 >Reporter: Jacky Lau >Priority: Major > Fix For: 1.20.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34437) Typo in SQL Client - `s/succeed/succeeded`
[ https://issues.apache.org/jira/browse/FLINK-34437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821615#comment-17821615 ] xuyang commented on FLINK-34437: Hi, [~jingge] . I see that this PR has been merged. Does this Jira need to be closed? > Typo in SQL Client - `s/succeed/succeeded` > -- > > Key: FLINK-34437 > URL: https://issues.apache.org/jira/browse/FLINK-34437 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client >Affects Versions: 1.18.1 >Reporter: Robin Moffatt >Priority: Not a Priority > Labels: pull-request-available > > > {code:java} > Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory'); > [INFO] Execute statement succeed. {code} > `{*}Execute statement {color:#FF}succeed{color}.{*}` is grammatically > incorrect, and should read `{*}Execute statement > {color:#FF}succeeded{color}.{*}` > > [https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34517) environment configs ignored when calling procedure operation
[ https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820947#comment-17820947 ] xuyang commented on FLINK-34517: Hi, [~JustinLee] . Do you mean there are some configs lost when executing procedure in SqlGateway? IIUC, the table config used when calling procedure is from `ExecutableOperationContextImpl`, which is built in `OperationExecutor` with the logic following. {code:java} private TableConfig tableConfig() { Configuration operationConfig = sessionContext.getSessionConf().clone(); operationConfig.addAll(executionConfig); TableConfig tableConfig = TableConfig.getDefault(); tableConfig.setRootConfiguration(sessionContext.getDefaultContext().getFlinkConfig()); tableConfig.addConfiguration(operationConfig); return tableConfig; } {code} > environment configs ignored when calling procedure operation > > > Key: FLINK-34517 > URL: https://issues.apache.org/jira/browse/FLINK-34517 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 >Reporter: JustinLee >Assignee: JustinLee >Priority: Major > > when calling procedure operation in Flink SQL, the ProcedureContext only > contains the underlying application-specific config , not > environment-specific config. > to be more specific, in a Flink sql app of the same > StreamExecutionEnvironment which has a config1. when executing a sql query, > config1 works, while calling a sql procedure, config1 doesn't work, which > apparently is not an expected behavior. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang closed FLINK-34378. -- Resolution: Not A Problem > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} > Result > {code:java} > ++---+---+---+---+ > | op | a | b | a0| b0| > ++---+---+---+---+ > | +I | 3 | 3 | 3 | 3 | > | +I | 7 | 7 | 7 | 7 | > | +I | 2 | 2 | 2 | 2 | > | +I | 5 | 5 | 5 | 5 | > | +I | 1 | 1 | 1 | 1 | > | +I | 6 | 6 | 6 | 6 | > | +I | 4 | 4 | 4 | 4 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+---+---+ > {code} > When I do not use minibatch join, the result is : > {code:java} > ++---+---+++ > | op | a | b | a0 | b0 | > ++---+---+++ > | +I | 1 | 1 | 1 | 1 | > | +I | 2 | 2 | 2 | 2 | > | +I | 3 | 3 | 3 | 3 | > | +I | 4 | 4 | 4 | 4 | > | +I | 5 | 5 | 5 | 5 | > | +I | 6 | 6 | 6 | 6 | > | +I | 7 | 7 | 7 | 7 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+++ > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820924#comment-17820924 ] xuyang commented on FLINK-34378: Thanks for your answer, [~xu_shuai_]. That sounds good to me. I'll close this jira. > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} > Result > {code:java} > ++---+---+---+---+ > | op | a | b | a0| b0| > ++---+---+---+---+ > | +I | 3 | 3 | 3 | 3 | > | +I | 7 | 7 | 7 | 7 | > | +I | 2 | 2 | 2 | 2 | > | +I | 5 | 5 | 5 | 5 | > | +I | 1 | 1 | 1 | 1 | > | +I | 6 | 6 | 6 | 6 | > | +I | 4 | 4 | 4 | 4 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+---+---+ > {code} > When I do not use minibatch join, the result is : > {code:java} > ++---+---+++ > | op | a | b | a0 | b0 | > ++---+---+++ > | +I | 1 | 1 | 1 | 1 | > | +I | 2 | 2 | 2 | 2 | > | +I | 3 | 3 | 3 | 3 | > | +I | 4 | 4 | 4 | 4 | > | +I | 5 | 5 | 5 | 5 | > | +I | 6 | 6 | 6 | 6 | > | +I | 7 | 7 | 7 | 7 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+++ > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result
[ https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang resolved FLINK-33489. Resolution: Resolved > LISTAGG with generating partial-final agg will cause wrong result > - > > Key: FLINK-33489 > URL: https://issues.apache.org/jira/browse/FLINK-33489 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, > 1.16.0, 1.17.0, 1.18.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > Adding the following test cases in SplitAggregateITCase will reproduce this > bug: > > {code:java} > // code placeholder > @Test > def testListAggWithDistinctMultiArgs(): Unit = { > val t1 = tEnv.sqlQuery(s""" > |SELECT > | a, > | LISTAGG(DISTINCT c, '#') > |FROM T > |GROUP BY a > """.stripMargin) > val sink = new TestingRetractSink > t1.toRetractStream[Row].addSink(sink) > env.execute() > val expected = Map[String, List[String]]( > "1" -> List("Hello 0", "Hello 1"), > "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"), > "3" -> List("Hello 0", "Hello 1"), > "4" -> List("Hello 1", "Hello 2", "Hello 3") > ) > val actualData = sink.getRetractResults.sorted > println(actualData) > } {code} > The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello > 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter > `#` will be ignored. > Let's take its plan: > {code:java} > // code placeholder > LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1]) > +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, > LISTAGG_RETRACT($f3_0) AS $f1]) > +- Exchange(distribution=[hash[a]]) > +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], > select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0]) > +- Exchange(distribution=[hash[a, $f3, $f4]]) > +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), > 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4]) > +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime]) > +- DataStreamScan(table=[[default_catalog, > default_database, T]], fields=[a, b, c]) {code} > The final `GroupAggregate` missing the delimiter args, and the default > delimiter `,` will be used. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34348) Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-34348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815480#comment-17815480 ] xuyang commented on FLINK-34348: Hi, [~hackergin]. Thanks for your detailed testing. Overall, it seems that nothing is missing, regarding the plan test and IT test. > Release Testing: Verify FLINK-20281 Window aggregation supports changelog > stream input > -- > > Key: FLINK-34348 > URL: https://issues.apache.org/jira/browse/FLINK-34348 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: Feng Jin >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > Attachments: 截屏2024-02-07 16.21.37.png, 截屏2024-02-07 16.21.55.png, > 截屏2024-02-07 16.22.24.png, 截屏2024-02-07 16.23.12.png, 截屏2024-02-07 > 16.23.27.png, 截屏2024-02-07 16.23.38.png, 截屏2024-02-07 16.29.09.png, > 截屏2024-02-07 16.29.21.png, 截屏2024-02-07 16.29.34.png, 截屏2024-02-07 > 16.46.12.png, 截屏2024-02-07 16.46.23.png, 截屏2024-02-07 16.46.37.png, > 截屏2024-02-07 16.53.37.png, 截屏2024-02-07 16.53.47.png, 截屏2024-02-07 > 16.54.01.png, 截屏2024-02-07 16.59.22.png, 截屏2024-02-07 16.59.33.png, > 截屏2024-02-07 16.59.42.png > > > Window TVF aggregation supports changelog stream is ready for testing. User > can add a window tvf aggregation as a down stream after CDC source or some > nodes that will produce cdc records. > Someone can verify this feature with: > # Prepare a mysql table, and insert some data at first. > # Start sql-client and prepare ddl for this mysql table as a cdc source. > # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a > window aggregate node and the changelog contains "UA" or "UB" or "D" in its > upstream. > # Use different kinds of window tvf to test window tvf aggregation while > updating the source data to check the data correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression
[ https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815141#comment-17815141 ] xuyang commented on FLINK-28693: This bug is caused by that the code generated by codegen references the class in the table-planner package, but the class in the table-planner package is hidden by table-planner-loader, so classloader cannot find it. I'll try to fix it. > Codegen failed if the watermark is defined on a columnByExpression > -- > > Key: FLINK-28693 > URL: https://issues.apache.org/jira/browse/FLINK-28693 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.1 >Reporter: Hongbo >Priority: Major > > The following code will throw an exception: > > {code:java} > Table program cannot be compiled. This is a bug. Please file an issue. > ... > Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column > 54: Cannot determine simple type name "org" {code} > {color:#00}Code:{color} > {code:java} > public class TestUdf extends ScalarFunction { > @DataTypeHint("TIMESTAMP(3)") > public LocalDateTime eval(String strDate) { >return LocalDateTime.now(); > } > } > public class FlinkTest { > @Test > void testUdf() throws Exception { > //var env = StreamExecutionEnvironment.createLocalEnvironment(); > // run `gradlew shadowJar` first to generate the uber jar. > // It contains the kafka connector and a dummy UDF function. > var env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, > "build/libs/flink-test-all.jar"); > env.setParallelism(1); > var tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class); > var testTable = tableEnv.from(TableDescriptor.forConnector("kafka") > .schema(Schema.newBuilder() > .column("time_stamp", DataTypes.STRING()) > .columnByExpression("udf_ts", "TEST_UDF(time_stamp)") > .watermark("udf_ts", "udf_ts - INTERVAL '1' second") > .build()) > // the kafka server doesn't need to exist. It fails in the > compile stage before fetching data. > .option("properties.bootstrap.servers", "localhost:9092") > .option("topic", "test_topic") > .option("format", "json") > .option("scan.startup.mode", "latest-offset") > .build()); > testTable.printSchema(); > tableEnv.createTemporaryView("test", testTable ); > var query = tableEnv.sqlQuery("select * from test"); > var tableResult = > query.executeInsert(TableDescriptor.forConnector("print").build()); > tableResult.await(); > } > }{code} > What does the code do? > # read a stream from Kakfa > # create a derived column using an UDF expression > # define the watermark based on the derived column > The full callstack: > > {code:java} > org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62) > ~[flink-table-runtime-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > ~[flink-dist-1.15.1.jar:1.15.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
[jira] [Commented] (FLINK-34016) Janino compile failed when watermark with column by udf
[ https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815140#comment-17815140 ] xuyang commented on FLINK-34016: Hi, [~wczhu] I have found the root bug and will try to fix it. You can temporarily replace flink-table-planner-loader with flink-table-planer in the opt/ folder just like https://issues.apache.org/jira/browse/FLINK-28693 said to work around this bug. > Janino compile failed when watermark with column by udf > --- > > Key: FLINK-34016 > URL: https://issues.apache.org/jira/browse/FLINK-34016 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.15.0, 1.18.0 >Reporter: ude >Priority: Major > Attachments: image-2024-01-25-11-53-06-158.png, > image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, > image-2024-01-25-12-57-34-632.png > > > After submit the following flink sql by sql-client.sh will throw an exception: > {code:java} > Caused by: java.lang.RuntimeException: Could not instantiate generated class > 'WatermarkGenerator$0' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74) > at > org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69) > at > org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109) > at > org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414) > at > org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68) > ... 16 more > Caused by: > org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92) > ... 18 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533) > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282) > at >
[jira] [Commented] (FLINK-34211) Filtering on Column names with ?s fails for JDBC lookup join.
[ https://issues.apache.org/jira/browse/FLINK-34211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815118#comment-17815118 ] xuyang commented on FLINK-34211: Hi, [~davidradl] this Jira looks like an improvement, not a bug, right? > Filtering on Column names with ?s fails for JDBC lookup join. > -- > > Key: FLINK-34211 > URL: https://issues.apache.org/jira/browse/FLINK-34211 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC, Table SQL / JDBC >Reporter: david radley >Priority: Minor > > There is a check for ? character in > [https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java#L186 > |FieldNamedPreparedStatementImpl.java] > Removing this check allows column names containing _?_ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34376) FLINK SQL SUM() causes a precision error
[ https://issues.apache.org/jira/browse/FLINK-34376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815091#comment-17815091 ] xuyang commented on FLINK-34376: This bug is introduced by https://issues.apache.org/jira/browse/FLINK-22586 . We can simplify the sql to re-produce this bug: {code:java} select cast(9.11 AS DECIMAL(38,18)) * 10 ++--+ | op | EXPR$0 | ++--+ | +I | 90.000 | ++--+ {code} This is a designed behavior(but there seems to be some problems). For the multiplication of Decimal types, the following formula is currently used. {code:java} // = Decimal Precision Deriving == // Adopted from "https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision- // scale-and-length-transact-sql" // // OperationResult PrecisionResult Scale // e1 + e2 max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2) // e1 - e2 max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2) // e1 * e2 p1 + p2 + 1 s1 + s2 // e1 / e2 p1 - s1 + s2 + max(6, s1 + p2 + 1) max(6, s1 + p2 + 1) // e1 % e2 min(p1-s1, p2-s2) + max(s1, s2) max(s1, s2) // // Also, if the precision / scale are out of the range, the scale may be sacrificed // in order to prevent the truncation of the integer part of the decimals. {code} For Integer type, the default precision is 10 and the scale is 0. So the result precision and scale is (49, 18). However, the precision exceeds the max precision 38, then it chooses to adjust scale from 18 to 7: {code:java} integer part: 49 - 18 = 31 adjusted scale: 38 - 31 = 7{code} IMO, the original design that choose to keep the integer part of the completion makes sense. But in this case, the result is wrong and we should fix it (by verifying mysql the result is `90.000110`). > FLINK SQL SUM() causes a precision error > > > Key: FLINK-34376 > URL: https://issues.apache.org/jira/browse/FLINK-34376 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.3, 1.18.1 >Reporter: Fangliang Liu >Priority: Major > Attachments: image-2024-02-06-11-15-02-669.png, > image-2024-02-06-11-17-03-399.png > > > {code:java} > select cast(sum(CAST(9.11 AS DECIMAL(38,18)) *10 ) as STRING) > {code} > The precision is wrong in the Flink 1.14.3 and master branch > !image-2024-02-06-11-15-02-669.png! > > The accuracy is correct in the Flink 1.13.2 > !image-2024-02-06-11-17-03-399.png! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815054#comment-17815054 ] xuyang commented on FLINK-34378: [~lsy] The situation is that although I set the parallelism is "1", but the order of output in minibatch is still disrupted. Hi, [~libenchao] . Thanks for reminding, I have attached the diff about results while tuning on and off the minibatch join. > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} > Result > {code:java} > ++---+---+---+---+ > | op | a | b | a0| b0| > ++---+---+---+---+ > | +I | 3 | 3 | 3 | 3 | > | +I | 7 | 7 | 7 | 7 | > | +I | 2 | 2 | 2 | 2 | > | +I | 5 | 5 | 5 | 5 | > | +I | 1 | 1 | 1 | 1 | > | +I | 6 | 6 | 6 | 6 | > | +I | 4 | 4 | 4 | 4 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+---+---+ > {code} > When I do not use minibatch join, the result is : > {code:java} > ++---+---+++ > | op | a | b | a0 | b0 | > ++---+---+++ > | +I | 1 | 1 | 1 | 1 | > | +I | 2 | 2 | 2 | 2 | > | +I | 3 | 3 | 3 | 3 | > | +I | 4 | 4 | 4 | 4 | > | +I | 5 | 5 | 5 | 5 | > | +I | 6 | 6 | 6 | 6 | > | +I | 7 | 7 | 7 | 7 | > | +I | 8 | 8 | 8 | 8 | > ++---+---+++ > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Description: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} Result {code:java} ++---+---+---+---+ | op | a | b | a0| b0| ++---+---+---+---+ | +I | 3 | 3 | 3 | 3 | | +I | 7 | 7 | 7 | 7 | | +I | 2 | 2 | 2 | 2 | | +I | 5 | 5 | 5 | 5 | | +I | 1 | 1 | 1 | 1 | | +I | 6 | 6 | 6 | 6 | | +I | 4 | 4 | 4 | 4 | | +I | 8 | 8 | 8 | 8 | ++---+---+---+---+ {code} When I do not use minibatch join, the result is : {code:java} ++---+---+++ | op | a | b | a0 | b0 | ++---+---+++ | +I | 1 | 1 | 1 | 1 | | +I | 2 | 2 | 2 | 2 | | +I | 3 | 3 | 3 | 3 | | +I | 4 | 4 | 4 | 4 | | +I | 5 | 5 | 5 | 5 | | +I | 6 | 6 | 6 | 6 | | +I | 7 | 7 | 7 | 7 | | +I | 8 | 8 | 8 | 8 | ++---+---+++ {code} was: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} Result {code:java} ++---+---+---+---+ | op | a | b | a0| b0| ++---+---+---+---+ | +I | 3 | 3 | 3 | 3 | | +I | 7 | 7 | 7 | 7 | | +I | 2 | 2 | 2 | 2 | | +I | 5 | 5 | 5 | 5 | | +I | 1 | 1 | 1 | 1 | | +I | 6 | 6 | 6 | 6 | | +I | 4 | 4 | 4 | 4 | | +I | 8 | 8 | 8 | 8 | ++---+---+---+---+ {code} > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >|
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Description: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} Result {code:java} ++---+---+---+---+ | op | a | b | a0| b0| ++---+---+---+---+ | +I | 3 | 3 | 3 | 3 | | +I | 7 | 7 | 7 | 7 | | +I | 2 | 2 | 2 | 2 | | +I | 5 | 5 | 5 | 5 | | +I | 1 | 1 | 1 | 1 | | +I | 6 | 6 | 6 | 6 | | +I | 4 | 4 | 4 | 4 | | +I | 8 | 8 | 8 | 8 | ++---+---+---+---+ {code} was: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} Result {code:java} // code placeholder {code} > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration >
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Description: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} Result {code:java} // code placeholder {code} was: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code}
[jira] [Commented] (FLINK-34381) `RelDataType#getFullTypeString` should be used to print in `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString`
[ https://issues.apache.org/jira/browse/FLINK-34381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814700#comment-17814700 ] xuyang commented on FLINK-34381: I will try to fix it if this makes sense indeed. > `RelDataType#getFullTypeString` should be used to print in > `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString` > > > Key: FLINK-34381 > URL: https://issues.apache.org/jira/browse/FLINK-34381 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Planner >Affects Versions: 1.9.0, 1.19.0 >Reporter: xuyang >Priority: Major > > Currently `RelTreeWriterImpl` use `rel.getRowType.toString` to print row type. > {code:java} > if (withRowType) { > s.append(", rowType=[").append(rel.getRowType.toString).append("]") > } {code} > However, looking deeper into the code, we should use > `rel.getRowType.getFullTypeString` to print the row type. Because the > function `getFullTypeString` will print richer type information such as > `nullable`. Take `StructuredRelDataType` as an example, the diff is below: > {code:java} > // source > util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c) > // sql > SELECT a, c FROM MyTable > // rel.getRowType.toString > RecordType(BIGINT a, VARCHAR(2147483647) c) > // rel.getRowType.getFullTypeString > RecordType(BIGINT a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" c) NOT > NULL{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`
[ https://issues.apache.org/jira/browse/FLINK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34372: --- Attachment: image-2024-02-06-16-38-24-085.png > Complete work for syntax `DESCRIBE CATALOG catalogName` > --- > > Key: FLINK-34372 > URL: https://issues.apache.org/jira/browse/FLINK-34372 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Attachments: image-2024-02-06-16-38-24-085.png > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`
[ https://issues.apache.org/jira/browse/FLINK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34372: --- Description: The public api changes are announced in Flip-69 ([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]). The result type of this query is also defined in Flip-69([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement] ) and Flip-84([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878]). Here I just show them again. {code:java} // Catalog /** * Get a user defined catalog description. * @return a user-implement catalog detailed explanation */ default String explainCatalog() { return String.format("CatalogClass:%s", this.getClass().getCanonicalName()); } {code} !image-2024-02-06-16-38-24-085.png|width=751,height=343! > Complete work for syntax `DESCRIBE CATALOG catalogName` > --- > > Key: FLINK-34372 > URL: https://issues.apache.org/jira/browse/FLINK-34372 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Major > Attachments: image-2024-02-06-16-38-24-085.png > > > The public api changes are announced in Flip-69 > ([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]). > > The result type of this query is also defined in > Flip-69([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement] > ) and > Flip-84([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878]). > Here I just show them again. > > {code:java} > // Catalog > /** > * Get a user defined catalog description. > * @return a user-implement catalog detailed explanation > */ > default String explainCatalog() { > return String.format("CatalogClass:%s", > this.getClass().getCanonicalName()); > } {code} > !image-2024-02-06-16-38-24-085.png|width=751,height=343! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34373) Complete work for syntax `DESCRIBE DATABASE databaseName`
[ https://issues.apache.org/jira/browse/FLINK-34373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814636#comment-17814636 ] xuyang commented on FLINK-34373: I'll try to fix it. > Complete work for syntax `DESCRIBE DATABASE databaseName` > - > > Key: FLINK-34373 > URL: https://issues.apache.org/jira/browse/FLINK-34373 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34381) `RelDataType#getFullTypeString` should be used to print in `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString`
xuyang created FLINK-34381: -- Summary: `RelDataType#getFullTypeString` should be used to print in `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString` Key: FLINK-34381 URL: https://issues.apache.org/jira/browse/FLINK-34381 Project: Flink Issue Type: Technical Debt Components: Table SQL / Planner Affects Versions: 1.9.0, 1.19.0 Reporter: xuyang Currently `RelTreeWriterImpl` use `rel.getRowType.toString` to print row type. {code:java} if (withRowType) { s.append(", rowType=[").append(rel.getRowType.toString).append("]") } {code} However, looking deeper into the code, we should use `rel.getRowType.getFullTypeString` to print the row type. Because the function `getFullTypeString` will print richer type information such as `nullable`. Take `StructuredRelDataType` as an example, the diff is below: {code:java} // source util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c) // sql SELECT a, c FROM MyTable // rel.getRowType.toString RecordType(BIGINT a, VARCHAR(2147483647) c) // rel.getRowType.getFullTypeString RecordType(BIGINT a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" c) NOT NULL{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814623#comment-17814623 ] xuyang commented on FLINK-34370: [~337361...@qq.com] Thanks for your volunteering. > [Umbrella] Complete work and improve about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) > about enhanced Flink SQL DDL. > With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems > that this flip is not finished yet. > The matrix is below: > |DDL|can be used in sql-client | > |_SHOW CATALOGS_|YES| > |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| > |_USE_ _CATALOG catalogName_|YES| > |_CREATE DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName_|YES| > |_DROP IF EXISTS DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName RESTRICT_|YES| > |_DROP DATABASE dataBaseName CASCADE_|YES| > |_ALTER DATABASE dataBaseName SET > ( name=value [, name=value]*)|YES| > |_USE dataBaseName_|YES| > |_SHOW_ _DATABASES_|YES| > |_DESCRIBE DATABASE dataBaseName_|{color:#de350b}NO{color}| > |_DESCRIBE EXTENDED DATABASE dataBaseName_|{color:#de350b}NO{color}| > |_SHOW_ _TABLES_|YES| > |_DESCRIBE tableName_|YES| > |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| > |_ALTER_ _TABLE tableName > RENAME TO newTableName|YES| > |_ALTER_ _TABLE tableName > SET ( name=value [, name=value]*)|YES| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join
xuyang created FLINK-34380: -- Summary: Strange RowKind and records about intermediate output when using minibatch join Key: FLINK-34380 URL: https://issues.apache.org/jira/browse/FLINK-34380 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.19.0 Reporter: xuyang Fix For: 1.19.0 {code:java} // Add it in CalcItCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( changelogRow("+I", java.lang.Integer.valueOf(1), "1"), changelogRow("-U", java.lang.Integer.valueOf(1), "1"), changelogRow("+U", java.lang.Integer.valueOf(1), "99"), changelogRow("-D", java.lang.Integer.valueOf(1), "99") ) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() } {code} Output: {code:java} ++-+-+-+-+ | op | a | b | a0 | b0 | ++-+-+-+-+ | +U | 1 | 1 | 1 | 99 | | +U | 1 | 99 | 1 | 99 | | -U | 1 | 1 | 1 | 99 | | -D | 1 | 99 | 1 | 99 | ++-+-+-+-+{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814612#comment-17814612 ] xuyang commented on FLINK-34349: Hi, I have finished this verification. Some unexpected behaviors such as bug and tech doubt have been linked to this jira. > Release Testing: Verify FLINK-34219 Introduce a new join operator to support > minibatch > -- > > Key: FLINK-34349 > URL: https://issues.apache.org/jira/browse/FLINK-34349 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Shuai Xu >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Minibatch join is ready. Users could improve performance in regular stream > join scenarios. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24240)] although it is still > being reviewed. > If someone finds some bugs about this feature, you open a Jira linked this > one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Affects Version/s: 1.19.0 > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Fix Version/s: 1.19.0 > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Component/s: Table SQL / Runtime > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt > Components: Table SQL / Runtime >Reporter: xuyang >Priority: Major > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814590#comment-17814590 ] xuyang commented on FLINK-34378: cc [~xu_shuai_] > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt >Reporter: xuyang >Priority: Major > > I'm not sure if it's a bug, the following case can re-produce this bug. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records
[ https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34378: --- Description: I'm not sure if it's a bug. The following case can re-produce this situation. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} was: I'm not sure if it's a bug, the following case can re-produce this bug. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} > Minibatch join disrupted the original order of input records > > > Key: FLINK-34378 > URL: https://issues.apache.org/jira/browse/FLINK-34378 > Project: Flink > Issue Type: Technical Debt >Reporter: xuyang >Priority: Major > > I'm not sure if it's a bug. The following case can re-produce this situation. > {code:java} > // add it in CalcITCase > @Test > def test(): Unit = { > env.setParallelism(1) > val rows = Seq( > row(1, "1"), > row(2, "2"), > row(3, "3"), > row(4, "4"), > row(5, "5"), > row(6, "6"), > row(7, "7"), > row(8, "8")) > val dataId = TestValuesTableFactory.registerData(rows) > val ddl = > s""" >|CREATE TABLE t1 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl) > val ddl2 = > s""" >|CREATE TABLE t2 ( >| a int, >| b string >|) WITH ( >| 'connector' = 'values', >| 'data-id' = '$dataId', >| 'bounded' = 'false' >|) > """.stripMargin > tEnv.executeSql(ddl2) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, > Boolean.box(true)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, > Duration.ofSeconds(5)) > tEnv.getConfig.getConfiguration > .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) > println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) > tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34378) Minibatch join disrupted the original order of input records
xuyang created FLINK-34378: -- Summary: Minibatch join disrupted the original order of input records Key: FLINK-34378 URL: https://issues.apache.org/jira/browse/FLINK-34378 Project: Flink Issue Type: Technical Debt Reporter: xuyang I'm not sure if it's a bug, the following case can re-produce this bug. {code:java} // add it in CalcITCase @Test def test(): Unit = { env.setParallelism(1) val rows = Seq( row(1, "1"), row(2, "2"), row(3, "3"), row(4, "4"), row(5, "5"), row(6, "6"), row(7, "7"), row(8, "8")) val dataId = TestValuesTableFactory.registerData(rows) val ddl = s""" |CREATE TABLE t1 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl) val ddl2 = s""" |CREATE TABLE t2 ( | a int, | b string |) WITH ( | 'connector' = 'values', | 'data-id' = '$dataId', | 'bounded' = 'false' |) """.stripMargin tEnv.executeSql(ddl2) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(5)) tEnv.getConfig.getConfiguration .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L)) println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain()) tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print() }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`
[ https://issues.apache.org/jira/browse/FLINK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814584#comment-17814584 ] xuyang commented on FLINK-34372: I'll try to do it. > Complete work for syntax `DESCRIBE CATALOG catalogName` > --- > > Key: FLINK-34372 > URL: https://issues.apache.org/jira/browse/FLINK-34372 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34375) Complete work for syntax `DESCRIBE EXTENDED tableName`
xuyang created FLINK-34375: -- Summary: Complete work for syntax `DESCRIBE EXTENDED tableName` Key: FLINK-34375 URL: https://issues.apache.org/jira/browse/FLINK-34375 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.10.0, 1.19.0 Reporter: xuyang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34374) Complete work for syntax `DESCRIBE EXTENDED DATABASE databaseName`
xuyang created FLINK-34374: -- Summary: Complete work for syntax `DESCRIBE EXTENDED DATABASE databaseName` Key: FLINK-34374 URL: https://issues.apache.org/jira/browse/FLINK-34374 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.10.0, 1.19.0 Reporter: xuyang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34370: --- Description: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The matrix is below: |DDL|can be used in sql-client | |_SHOW CATALOGS_|YES| |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| |_USE_ _CATALOG catalogName_|YES| |_CREATE DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName_|YES| |_DROP IF EXISTS DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName RESTRICT_|YES| |_DROP DATABASE dataBaseName CASCADE_|YES| |_ALTER DATABASE dataBaseName SET ( name=value [, name=value]*)|YES| |_USE dataBaseName_|YES| |_SHOW_ _DATABASES_|YES| |_DESCRIBE DATABASE dataBaseName_|{color:#de350b}NO{color}| |_DESCRIBE EXTENDED DATABASE dataBaseName_|{color:#de350b}NO{color}| |_SHOW_ _TABLES_|YES| |_DESCRIBE tableName_|YES| |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| |_ALTER_ _TABLE tableName RENAME TO newTableName|YES| |_ALTER_ _TABLE tableName SET ( name=value [, name=value]*)|YES| was: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The matrix is below: |DDL|can be used in sql-client | |_SHOW CATALOGS_|YES| |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| |_USE_ _CATALOG catalogName_|YES| |_CREATE DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName_|YES| |_DROP IF EXISTS DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName RESTRICT_|YES| |_DROP DATABASE dataBaseName CASCADE_|YES| |_ALTER DATABASE dataBaseName SET ( name=value [, name=value]*)|YES| |_USE dataBaseName_|YES| |_SHOW_ _DATABASES_|YES| |_DESCRIBE DATABASE dataBasesName_|{color:#de350b}NO{color}| |_DESCRIBE EXTENDED DATABASE dataBasesName_|{color:#de350b}NO{color}| |_SHOW_ _TABLES_|YES| |_DESCRIBE tableName_|YES| |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| |_ALTER_ _TABLE tableName RENAME TO newTableName|YES| |_ALTER_ _TABLE tableName SET ( name=value [, name=value]*)|YES| > [Umbrella] Complete work and improve about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) > about enhanced Flink SQL DDL. > With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems > that this flip is not finished yet. > The matrix is below: > |DDL|can be used in sql-client | > |_SHOW CATALOGS_|YES| > |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| > |_USE_ _CATALOG catalogName_|YES| > |_CREATE DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName_|YES| > |_DROP IF EXISTS DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName RESTRICT_|YES| > |_DROP DATABASE dataBaseName CASCADE_|YES| > |_ALTER DATABASE dataBaseName SET > ( name=value [, name=value]*)|YES| > |_USE dataBaseName_|YES| > |_SHOW_ _DATABASES_|YES| > |_DESCRIBE DATABASE dataBaseName_|{color:#de350b}NO{color}| > |_DESCRIBE EXTENDED DATABASE dataBaseName_|{color:#de350b}NO{color}| > |_SHOW_ _TABLES_|YES| > |_DESCRIBE tableName_|YES| > |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| > |_ALTER_ _TABLE tableName > RENAME TO newTableName|YES| > |_ALTER_ _TABLE tableName > SET ( name=value [, name=value]*)|YES| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34373) Complete work for syntax `DESCRIBE DATABASE databaseName`
xuyang created FLINK-34373: -- Summary: Complete work for syntax `DESCRIBE DATABASE databaseName` Key: FLINK-34373 URL: https://issues.apache.org/jira/browse/FLINK-34373 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.10.0, 1.19.0 Reporter: xuyang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34370: --- Description: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The matrix is below: |DDL|can be used in sql-client | |_SHOW CATALOGS_|YES| |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| |_USE_ _CATALOG catalogName_|YES| |_CREATE DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName_|YES| |_DROP IF EXISTS DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName RESTRICT_|YES| |_DROP DATABASE dataBaseName CASCADE_|YES| |_ALTER DATABASE dataBaseName SET ( name=value [, name=value]*)|YES| |_USE dataBaseName_|YES| |_SHOW_ _DATABASES_|YES| |_DESCRIBE DATABASE dataBasesName_|{color:#de350b}NO{color}| |_DESCRIBE EXTENDED DATABASE dataBasesName_|{color:#de350b}NO{color}| |_SHOW_ _TABLES_|YES| |_DESCRIBE tableName_|YES| |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| |_ALTER_ _TABLE tableName RENAME TO newTableName|YES| |_ALTER_ _TABLE tableName SET ( name=value [, name=value]*)|YES| was: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The matrix is below: |DDL|can be used in sql-client | |_SHOW CATALOGS_|YES| |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| |_USE_ _CATALOG catalogName_ |YES| |_CREATE DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName_|YES| |_DROP IF EXISTS DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName RESTRICT_|YES| |_DROP DATABASE dataBaseName CASCADE_|YES| |_ALTER DATABASE dataBaseName SET ( name=value [, name=value]*)_|YES| |_USE dataBaseName_ |YES| |_SHOW_ _DATABASES_|YES| |_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}| |_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}| |_SHOW_ _TABLES_|YES| |_DESCRIBE tableName_|YES| |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| |_ALTER_ _TABLE tableName RENAME TO newTableName_|YES| |_ALTER_ _TABLE tableName SET ( name=value [, name=value]*)_|YES| > [Umbrella] Complete work and improve about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) > about enhanced Flink SQL DDL. > With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems > that this flip is not finished yet. > The matrix is below: > |DDL|can be used in sql-client | > |_SHOW CATALOGS_|YES| > |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| > |_USE_ _CATALOG catalogName_|YES| > |_CREATE DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName_|YES| > |_DROP IF EXISTS DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName RESTRICT_|YES| > |_DROP DATABASE dataBaseName CASCADE_|YES| > |_ALTER DATABASE dataBaseName SET > ( name=value [, name=value]*)|YES| > |_USE dataBaseName_|YES| > |_SHOW_ _DATABASES_|YES| > |_DESCRIBE DATABASE dataBasesName_|{color:#de350b}NO{color}| > |_DESCRIBE EXTENDED DATABASE dataBasesName_|{color:#de350b}NO{color}| > |_SHOW_ _TABLES_|YES| > |_DESCRIBE tableName_|YES| > |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| > |_ALTER_ _TABLE tableName > RENAME TO newTableName|YES| > |_ALTER_ _TABLE tableName > SET ( name=value [, name=value]*)|YES| > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`
xuyang created FLINK-34372: -- Summary: Complete work for syntax `DESCRIBE CATALOG catalogName` Key: FLINK-34372 URL: https://issues.apache.org/jira/browse/FLINK-34372 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.10.0, 1.19.0 Reporter: xuyang -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34370: --- Summary: [Umbrella] Complete work and improve about enhanced Flink SQL DDL (was: [Umbrella] Complete work about enhanced Flink SQL DDL ) > [Umbrella] Complete work and improve about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) > about enhanced Flink SQL DDL. > With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems > that this flip is not finished yet. > The matrix is below: > |DDL|can be used in sql-client | > |_SHOW CATALOGS_|YES| > |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| > |_USE_ _CATALOG catalogName_ |YES| > |_CREATE DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName_|YES| > |_DROP IF EXISTS DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName RESTRICT_|YES| > |_DROP DATABASE dataBaseName CASCADE_|YES| > |_ALTER DATABASE dataBaseName SET > ( name=value [, name=value]*)_|YES| > |_USE dataBaseName_ |YES| > |_SHOW_ _DATABASES_|YES| > |_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}| > |_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}| > |_SHOW_ _TABLES_|YES| > |_DESCRIBE tableName_|YES| > |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| > |_ALTER_ _TABLE tableName > RENAME TO newTableName_|YES| > |_ALTER_ _TABLE tableName > SET ( name=value [, name=value]*)_|YES| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34370: --- Description: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The matrix is below: |DDL|can be used in sql-client | |_SHOW CATALOGS_|YES| |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| |_USE_ _CATALOG catalogName_ |YES| |_CREATE DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName_|YES| |_DROP IF EXISTS DATABASE dataBaseName_|YES| |_DROP DATABASE dataBaseName RESTRICT_|YES| |_DROP DATABASE dataBaseName CASCADE_|YES| |_ALTER DATABASE dataBaseName SET ( name=value [, name=value]*)_|YES| |_USE dataBaseName_ |YES| |_SHOW_ _DATABASES_|YES| |_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}| |_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}| |_SHOW_ _TABLES_|YES| |_DESCRIBE tableName_|YES| |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| |_ALTER_ _TABLE tableName RENAME TO newTableName_|YES| |_ALTER_ _TABLE tableName SET ( name=value [, name=value]*)_|YES| was: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The martix is > [Umbrella] Complete work about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) > about enhanced Flink SQL DDL. > With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems > that this flip is not finished yet. > The matrix is below: > |DDL|can be used in sql-client | > |_SHOW CATALOGS_|YES| > |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}| > |_USE_ _CATALOG catalogName_ |YES| > |_CREATE DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName_|YES| > |_DROP IF EXISTS DATABASE dataBaseName_|YES| > |_DROP DATABASE dataBaseName RESTRICT_|YES| > |_DROP DATABASE dataBaseName CASCADE_|YES| > |_ALTER DATABASE dataBaseName SET > ( name=value [, name=value]*)_|YES| > |_USE dataBaseName_ |YES| > |_SHOW_ _DATABASES_|YES| > |_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}| > |_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}| > |_SHOW_ _TABLES_|YES| > |_DESCRIBE tableName_|YES| > |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}| > |_ALTER_ _TABLE tableName > RENAME TO newTableName_|YES| > |_ALTER_ _TABLE tableName > SET ( name=value [, name=value]*)_|YES| -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34370: --- Description: This is a umbrella Jira for completing work for [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) about enhanced Flink SQL DDL. With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. The martix is was: This is a umbrella Jira for completing work for [Flip-69](https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement) about enhanced Flink SQL DDL. With [FLINK-34254](https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. > [Umbrella] Complete work about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]) > about enhanced Flink SQL DDL. > With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems > that this flip is not finished yet. > The martix is -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL
[ https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34370: --- Affects Version/s: 1.19.0 > [Umbrella] Complete work about enhanced Flink SQL DDL > -- > > Key: FLINK-34370 > URL: https://issues.apache.org/jira/browse/FLINK-34370 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 1.10.0, 1.19.0 >Reporter: xuyang >Priority: Major > > This is a umbrella Jira for completing work for > [Flip-69](https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement) > about enhanced Flink SQL DDL. > With [FLINK-34254](https://issues.apache.org/jira/browse/FLINK-34254), it > seems that this flip is not finished yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
[ https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang closed FLINK-34254. -- Resolution: Duplicate > `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions > - > > Key: FLINK-34254 > URL: https://issues.apache.org/jira/browse/FLINK-34254 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: xuyang >Priority: Major > > Add the test following to CalcITCase to re-produce this bug. > {code:java} > @Test > def test(): Unit = { > tEnv.executeSql(s""" >|create catalog `c_new` with ( >| 'type' = 'generic_in_memory', >| 'default-database' = 'my_d' >|) >|""".stripMargin) > tEnv > .executeSql(s""" > |show catalogs > |""".stripMargin) > .print > tEnv > .executeSql(s""" > | describe catalog default_catalog > |""".stripMargin) > .print > } {code} > Result: > {code:java} > +-+ > | catalog name | > +-+ > | c_new | > | default_catalog | > +-+ > 2 rows in set > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in > any table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) >at > org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused > by: org.apache.calcite.runtime.CalciteContextException: From line 2, column > 19 to line 2, column 33: Column 'default_catalog' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273) >at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150) > at > org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304) > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005) > at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196) > ... 11 moreCaused by: > org.apache.calcite.sql.validate.SqlValidatorException: Column > 'default_catalog' not found in any tableat > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at >
[jira] [Comment Edited] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
[ https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814564#comment-17814564 ] xuyang edited comment on FLINK-34254 at 2/6/24 1:43 AM: I have created a Jira (https://issues.apache.org/jira/browse/FLINK-34370) linked this one to do the improvement. So close it. was (Author: xuyangzhong): I have created a Jira linked this one to do the improvement. > `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions > - > > Key: FLINK-34254 > URL: https://issues.apache.org/jira/browse/FLINK-34254 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: xuyang >Priority: Major > > Add the test following to CalcITCase to re-produce this bug. > {code:java} > @Test > def test(): Unit = { > tEnv.executeSql(s""" >|create catalog `c_new` with ( >| 'type' = 'generic_in_memory', >| 'default-database' = 'my_d' >|) >|""".stripMargin) > tEnv > .executeSql(s""" > |show catalogs > |""".stripMargin) > .print > tEnv > .executeSql(s""" > | describe catalog default_catalog > |""".stripMargin) > .print > } {code} > Result: > {code:java} > +-+ > | catalog name | > +-+ > | c_new | > | default_catalog | > +-+ > 2 rows in set > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in > any table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) >at > org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused > by: org.apache.calcite.runtime.CalciteContextException: From line 2, column > 19 to line 2, column 33: Column 'default_catalog' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273) >at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150) > at > org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304) > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005) > at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196) > ... 11 moreCaused by: > org.apache.calcite.sql.validate.SqlValidatorException: Column >
[jira] [Commented] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
[ https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814564#comment-17814564 ] xuyang commented on FLINK-34254: I have created a Jira linked this one to do the improvement. > `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions > - > > Key: FLINK-34254 > URL: https://issues.apache.org/jira/browse/FLINK-34254 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Reporter: xuyang >Priority: Major > > Add the test following to CalcITCase to re-produce this bug. > {code:java} > @Test > def test(): Unit = { > tEnv.executeSql(s""" >|create catalog `c_new` with ( >| 'type' = 'generic_in_memory', >| 'default-database' = 'my_d' >|) >|""".stripMargin) > tEnv > .executeSql(s""" > |show catalogs > |""".stripMargin) > .print > tEnv > .executeSql(s""" > | describe catalog default_catalog > |""".stripMargin) > .print > } {code} > Result: > {code:java} > +-+ > | catalog name | > +-+ > | c_new | > | default_catalog | > +-+ > 2 rows in set > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in > any table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117) > at > org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728) >at > org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453) > at java.lang.reflect.Method.invoke(Method.java:498) at > java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189) at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused > by: org.apache.calcite.runtime.CalciteContextException: From line 2, column > 19 to line 2, column 33: Column 'default_catalog' not found in any table > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276) > at > org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273) >at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150) > at > org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304) > at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005) > at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196) > ... 11 moreCaused by: > org.apache.calcite.sql.validate.SqlValidatorException: Column > 'default_catalog' not found in any tableat > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at >
[jira] [Created] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL
xuyang created FLINK-34370: -- Summary: [Umbrella] Complete work about enhanced Flink SQL DDL Key: FLINK-34370 URL: https://issues.apache.org/jira/browse/FLINK-34370 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.10.0 Reporter: xuyang This is a umbrella Jira for completing work for [Flip-69](https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement) about enhanced Flink SQL DDL. With [FLINK-34254](https://issues.apache.org/jira/browse/FLINK-34254), it seems that this flip is not finished yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34353) A strange exception will be thrown if minibatch size is not set while using mini-batch join
[ https://issues.apache.org/jira/browse/FLINK-34353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814061#comment-17814061 ] xuyang commented on FLINK-34353: I'll try to fix it. > A strange exception will be thrown if minibatch size is not set while using > mini-batch join > --- > > Key: FLINK-34353 > URL: https://issues.apache.org/jira/browse/FLINK-34353 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > java.lang.IllegalArgumentException: maxCount must be greater than 0 > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger.(CountCoBundleTrigger.java:34) > at > org.apache.flink.table.planner.plan.utils.MinibatchUtil.createMiniBatchCoTrigger(MinibatchUtil.java:61) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.java:231) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168) >at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) >at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937)at > scala.collection.Iterator.foreach$(Iterator.scala:937) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at > scala.collection.IterableLike.foreach(IterableLike.scala:70) at > scala.collection.IterableLike.foreach$(IterableLike.scala:69)at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike.map(TraversableLike.scala:233) at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) > at > org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:537) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482) > at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34353) A strange exception will be thrown if minibatch size is not set while using mini-batch join
[ https://issues.apache.org/jira/browse/FLINK-34353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34353: --- Description: {code:java} java.lang.IllegalArgumentException: maxCount must be greater than 0 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)at org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger.(CountCoBundleTrigger.java:34) at org.apache.flink.table.planner.plan.utils.MinibatchUtil.createMiniBatchCoTrigger(MinibatchUtil.java:61) at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.java:231) at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168) at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937)at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69)at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) at org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:537) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697) at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) {code} > A strange exception will be thrown if minibatch size is not set while using > mini-batch join > --- > > Key: FLINK-34353 > URL: https://issues.apache.org/jira/browse/FLINK-34353 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > {code:java} > java.lang.IllegalArgumentException: maxCount must be greater than 0 > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) > at > org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger.(CountCoBundleTrigger.java:34) > at > org.apache.flink.table.planner.plan.utils.MinibatchUtil.createMiniBatchCoTrigger(MinibatchUtil.java:61) > at > org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.java:231) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168) >at > org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) >at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) > at scala.collection.Iterator.foreach(Iterator.scala:937)at > scala.collection.Iterator.foreach$(Iterator.scala:937) at > scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at > scala.collection.IterableLike.foreach(IterableLike.scala:70) at > scala.collection.IterableLike.foreach$(IterableLike.scala:69)at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at > scala.collection.TraversableLike.map(TraversableLike.scala:233) at > scala.collection.TraversableLike.map$(TraversableLike.scala:226) at > scala.collection.AbstractTraversable.map(Traversable.scala:104) at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84) > at > org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:537) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482) >
[jira] [Created] (FLINK-34353) A strange exception will be thrown if minibatch size is not set while using mini-batch join
xuyang created FLINK-34353: -- Summary: A strange exception will be thrown if minibatch size is not set while using mini-batch join Key: FLINK-34353 URL: https://issues.apache.org/jira/browse/FLINK-34353 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: xuyang Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch
[ https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814047#comment-17814047 ] xuyang commented on FLINK-34349: Hi, I'd like to take this verification. > Release Testing: Verify FLINK-34219 Introduce a new join operator to support > minibatch > -- > > Key: FLINK-34349 > URL: https://issues.apache.org/jira/browse/FLINK-34349 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Affects Versions: 1.19.0 >Reporter: Shuai Xu >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Minibatch join is ready. Users could improve performance in regular stream > join scenarios. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24240)] although it is still > being reviewed. > If someone finds some bugs about this feature, you open a Jira linked this > one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34348) Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-34348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34348: --- Description: Window TVF aggregation supports changelog stream is ready for testing. User can add a window tvf aggregation as a down stream after CDC source or some nodes that will produce cdc records. Someone can verify this feature with: # Prepare a mysql table, and insert some data at first. # Start sql-client and prepare ddl for this mysql table as a cdc source. # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a window aggregate node and the changelog contains "UA" or "UB" or "D" in its upstream. # Use different kinds of window tvf to test window tvf aggregation while updating the source data to check the data correctness. > Release Testing: Verify FLINK-20281 Window aggregation supports changelog > stream input > -- > > Key: FLINK-34348 > URL: https://issues.apache.org/jira/browse/FLINK-34348 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Window TVF aggregation supports changelog stream is ready for testing. User > can add a window tvf aggregation as a down stream after CDC source or some > nodes that will produce cdc records. > Someone can verify this feature with: > # Prepare a mysql table, and insert some data at first. > # Start sql-client and prepare ddl for this mysql table as a cdc source. > # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a > window aggregate node and the changelog contains "UA" or "UB" or "D" in its > upstream. > # Use different kinds of window tvf to test window tvf aggregation while > updating the source data to check the data correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xuyang updated FLINK-34346: --- Description: Session window TVF is ready. Users can use Session window TVF aggregation instead of using legacy session group window aggregation. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24250]) although it is still being reviewed. Further more, although session window join, session window rank and session window deduplicate are in experimental state, If someone finds some bugs about them, you could also open a Jira linked this one to report them. > Release Testing: Verify FLINK-24024 Support session Window TVF > -- > > Key: FLINK-34346 > URL: https://issues.apache.org/jira/browse/FLINK-34346 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: xuyang >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > > Session window TVF is ready. Users can use Session window TVF aggregation > instead of using legacy session group window aggregation. > Someone can verify this feature by following the > [doc]([https://github.com/apache/flink/pull/24250]) although it is still > being reviewed. > Further more, although session window join, session window rank and session > window deduplicate are in experimental state, If someone finds some bugs > about them, you could also open a Jira linked this one to report them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34348) Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input
xuyang created FLINK-34348: -- Summary: Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input Key: FLINK-34348 URL: https://issues.apache.org/jira/browse/FLINK-34348 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.19.0 Reporter: xuyang Assignee: xuyang Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF
xuyang created FLINK-34346: -- Summary: Release Testing: Verify FLINK-24024 Support session Window TVF Key: FLINK-34346 URL: https://issues.apache.org/jira/browse/FLINK-34346 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.19.0 Reporter: xuyang Assignee: xuyang Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-33963) There is only one UDF instance after serializing the same task
[ https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814014#comment-17814014 ] xuyang edited comment on FLINK-33963 at 2/4/24 2:34 AM: I agree that we should be more rigorous in our consideration of whether a scalar function should be reused. However, in the scenario you mentioned, if the path can be extracted once and used permanently, it then implies that the path is a constant across all elements. In such a case, using two scalar functions might not be the best choice. That's why I just say this is not a uncommon scenario. was (Author: xuyangzhong): I agree that we should be more rigorous in our consideration of whether a scalar function should be reused. However, in the scenario you mentioned, if the path can be extracted once and used permanently, it then implies that the path is a constant across all elements. In such a case, using a scalar function might not be the best choice. That's why I just say this is not a uncommon scenario. > There is only one UDF instance after serializing the same task > -- > > Key: FLINK-33963 > URL: https://issues.apache.org/jira/browse/FLINK-33963 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 > Environment: local env in idea test. > java 8 >Reporter: lifengchao >Priority: Major > > I define this UDF and expect the following SQL to return 'a', 'b', but it > return 'a', 'a'. > {code:java} > public class UdfSerializeFunc extends ScalarFunction { > static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > } > {code} > sql: > {code:sql} > select > udf_ser(name, 'a') name1, > udf_ser(name, 'b') name2 > from heros > {code} > Changing UDF to this will achieve the expected results. > {code:java} > public class UdfSerializeFunc2 extends ScalarFunction { > static final Logger LOG = > LoggerFactory.getLogger(UdfSerializeFunc2.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > @Override > public TypeInference getTypeInference(DataTypeFactory typeFactory) { > return TypeInference.newBuilder() > .outputTypeStrategy(new TypeStrategy() { > @Override > public Optional inferType(CallContext > callContext) { > List argumentDataTypes = > callContext.getArgumentDataTypes(); > if (argumentDataTypes.size() != 2) { > throw callContext.newValidationError("arg size > error"); > } > if (!callContext.isArgumentLiteral(1) || > callContext.isArgumentNull(1)) { > throw callContext.newValidationError("Literal > expected for second argument."); > } > cache = callContext.getArgumentValue(1, > String.class).get(); > return Optional.of(DataTypes.STRING()); > } > }) > .build(); > } > } > {code} > > My complete test code: > {code:java} > public class UdfSerializeFunc extends ScalarFunction { > static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > } > public class UdfSerializeFunc2 extends ScalarFunction { > static final Logger LOG = > LoggerFactory.getLogger(UdfSerializeFunc2.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; >
[jira] [Commented] (FLINK-33963) There is only one UDF instance after serializing the same task
[ https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814014#comment-17814014 ] xuyang commented on FLINK-33963: I agree that we should be more rigorous in our consideration of whether a scalar function should be reused. However, in the scenario you mentioned, if the path can be extracted once and used permanently, it then implies that the path is a constant across all elements. In such a case, using a scalar function might not be the best choice. That's why I just say this is not a uncommon scenario. > There is only one UDF instance after serializing the same task > -- > > Key: FLINK-33963 > URL: https://issues.apache.org/jira/browse/FLINK-33963 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0 > Environment: local env in idea test. > java 8 >Reporter: lifengchao >Priority: Major > > I define this UDF and expect the following SQL to return 'a', 'b', but it > return 'a', 'a'. > {code:java} > public class UdfSerializeFunc extends ScalarFunction { > static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > } > {code} > sql: > {code:sql} > select > udf_ser(name, 'a') name1, > udf_ser(name, 'b') name2 > from heros > {code} > Changing UDF to this will achieve the expected results. > {code:java} > public class UdfSerializeFunc2 extends ScalarFunction { > static final Logger LOG = > LoggerFactory.getLogger(UdfSerializeFunc2.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > @Override > public TypeInference getTypeInference(DataTypeFactory typeFactory) { > return TypeInference.newBuilder() > .outputTypeStrategy(new TypeStrategy() { > @Override > public Optional inferType(CallContext > callContext) { > List argumentDataTypes = > callContext.getArgumentDataTypes(); > if (argumentDataTypes.size() != 2) { > throw callContext.newValidationError("arg size > error"); > } > if (!callContext.isArgumentLiteral(1) || > callContext.isArgumentNull(1)) { > throw callContext.newValidationError("Literal > expected for second argument."); > } > cache = callContext.getArgumentValue(1, > String.class).get(); > return Optional.of(DataTypes.STRING()); > } > }) > .build(); > } > } > {code} > > My complete test code: > {code:java} > public class UdfSerializeFunc extends ScalarFunction { > static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > } > public class UdfSerializeFunc2 extends ScalarFunction { > static final Logger LOG = > LoggerFactory.getLogger(UdfSerializeFunc2.class); > String cache; > @Override > public void open(FunctionContext context) throws Exception { > LOG.warn("open:{}.", this.hashCode()); > } > public String eval(String a, String b){ > if(cache == null){ > LOG.warn("cache_null.cache:{}", b); > cache = b; > } > return cache; > } > @Override > public TypeInference getTypeInference(DataTypeFactory typeFactory) { > return TypeInference.newBuilder() > .outputTypeStrategy(new TypeStrategy() { > @Override > public Optional inferType(CallContext > callContext) { > List argumentDataTypes = > callContext.getArgumentDataTypes(); > if (argumentDataTypes.size() != 2) { >
[jira] [Commented] (FLINK-34301) Release Testing Instructions: Verify FLINK-20281 Window aggregation supports changelog stream input
[ https://issues.apache.org/jira/browse/FLINK-34301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813595#comment-17813595 ] xuyang commented on FLINK-34301: Window TVF aggregation supports changelog stream is ready for testing. User can add a window tvf aggregation as a down stream after CDC source or some nodes that will produce cdc records. Someone can verify this feature with: # Prepare a mysql table, and insert some data at first. # Start sql-client and prepare ddl for this mysql table as a cdc source. # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a window aggregate node and the changelog contains "UA" or "UB" or "D" in its upstream. # Use different kinds of window tvf to test window tvf aggregation while updating the source data to check the data correctness. > Release Testing Instructions: Verify FLINK-20281 Window aggregation supports > changelog stream input > --- > > Key: FLINK-34301 > URL: https://issues.apache.org/jira/browse/FLINK-34301 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813590#comment-17813590 ] xuyang edited comment on FLINK-34300 at 2/2/24 9:29 AM: Session window TVF is ready. Users can use Session window TVF aggregation instead of using legacy session group window aggregation. Someone can verify this feature by following the [doc]([https://github.com/apache/flink/pull/24250]) although it is still being reviewed. Further more, although session window join, session window rank and session window deduplicate are in experimental state, If someone finds some bugs about them, you could also open a Jira linked this one to report them. was (Author: xuyangzhong): Session window TVF is ready. Users can use Session window TVF aggregation instead of using legacy session group window aggregation. Someone can verify this feature by the [doc](https://github.com/apache/flink/pull/24250) although it is preparing. Further more, although session window join, session window rank and session window deduplicate are in experimental state, If someone finds some bugs about them, you could also open a Jira linked this one to report it. > Release Testing Instructions: Verify FLINK-24024 Support session Window TVF > --- > > Key: FLINK-34300 > URL: https://issues.apache.org/jira/browse/FLINK-34300 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
[ https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813590#comment-17813590 ] xuyang commented on FLINK-34300: Session window TVF is ready. Users can use Session window TVF aggregation instead of using legacy session group window aggregation. Someone can verify this feature by the [doc](https://github.com/apache/flink/pull/24250) although it is preparing. Further more, although session window join, session window rank and session window deduplicate are in experimental state, If someone finds some bugs about them, you could also open a Jira linked this one to report it. > Release Testing Instructions: Verify FLINK-24024 Support session Window TVF > --- > > Key: FLINK-34300 > URL: https://issues.apache.org/jira/browse/FLINK-34300 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.19.0 >Reporter: lincoln lee >Assignee: xuyang >Priority: Blocker > Labels: release-testing > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34338) An exception is thrown when some named params change order when using window tvf
xuyang created FLINK-34338: -- Summary: An exception is thrown when some named params change order when using window tvf Key: FLINK-34338 URL: https://issues.apache.org/jira/browse/FLINK-34338 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.15.0 Reporter: xuyang Fix For: 1.19.0 This bug can be reproduced by the following sql in `WindowTableFunctionTest` {code:java} @Test def test(): Unit = { val sql = """ |SELECT * |FROM TABLE(TUMBLE( | DATA => TABLE MyTable, | SIZE => INTERVAL '15' MINUTE, | TIMECOL => DESCRIPTOR(rowtime) | )) |""".stripMargin util.verifyRelPlan(sql) }{code} In Flip-145 and user doc, we can found `the DATA param must be the first`, but it seems that we also can't change the order about other params. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34335) Query hints in RexSubQuery could not be printed
xuyang created FLINK-34335: -- Summary: Query hints in RexSubQuery could not be printed Key: FLINK-34335 URL: https://issues.apache.org/jira/browse/FLINK-34335 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: xuyang Fix For: 1.19.0 That is because in `RelTreeWriterImpl`, we don't care about the `RexSubQuery`. And `RexSubQuery` use `RelOptUtil.toString(rel)` to print itself instead of adding extra information such as query hints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34335) Query hints in RexSubQuery could not be printed
[ https://issues.apache.org/jira/browse/FLINK-34335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813470#comment-17813470 ] xuyang commented on FLINK-34335: I'll try to fix it. > Query hints in RexSubQuery could not be printed > --- > > Key: FLINK-34335 > URL: https://issues.apache.org/jira/browse/FLINK-34335 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > > That is because in `RelTreeWriterImpl`, we don't care about the > `RexSubQuery`. And `RexSubQuery` use `RelOptUtil.toString(rel)` to print > itself instead of adding extra information such as query hints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34272) AdaptiveSchedulerClusterITCase failure due to MiniCluster not running
[ https://issues.apache.org/jira/browse/FLINK-34272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813001#comment-17813001 ] xuyang commented on FLINK-34272: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57161=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8 > AdaptiveSchedulerClusterITCase failure due to MiniCluster not running > - > > Key: FLINK-34272 > URL: https://issues.apache.org/jira/browse/FLINK-34272 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.0 >Reporter: Matthias Pohl >Assignee: David Morávek >Priority: Blocker > Labels: pull-request-available, test-stability > > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57073=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9543] > {code:java} > Jan 29 17:21:29 17:21:29.465 [ERROR] Tests run: 3, Failures: 0, Errors: 2, > Skipped: 0, Time elapsed: 12.48 s <<< FAILURE! -- in > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase > Jan 29 17:21:29 17:21:29.465 [ERROR] > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testAutomaticScaleUp > -- Time elapsed: 8.599 s <<< ERROR! > Jan 29 17:21:29 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 29 17:21:29 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 29 17:21:29 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1118) > Jan 29 17:21:29 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:991) > Jan 29 17:21:29 at > org.apache.flink.runtime.minicluster.MiniCluster.getArchivedExecutionGraph(MiniCluster.java:840) > Jan 29 17:21:29 at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.lambda$waitUntilParallelismForVertexReached$3(AdaptiveSchedulerClusterITCase.java:270) > Jan 29 17:21:29 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151) > Jan 29 17:21:29 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145) > Jan 29 17:21:29 at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.waitUntilParallelismForVertexReached(AdaptiveSchedulerClusterITCase.java:265) > Jan 29 17:21:29 at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testAutomaticScaleUp(AdaptiveSchedulerClusterITCase.java:146) > Jan 29 17:21:29 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 29 17:21:29 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > Jan 29 17:21:29 > Jan 29 17:21:29 17:21:29.466 [ERROR] > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale > -- Time elapsed: 2.036 s <<< ERROR! > Jan 29 17:21:29 java.lang.IllegalStateException: MiniCluster is not yet > running or has already been shut down. > Jan 29 17:21:29 at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > Jan 29 17:21:29 at > org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1118) > Jan 29 17:21:29 at > org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:991) > Jan 29 17:21:29 at > org.apache.flink.runtime.minicluster.MiniCluster.getExecutionGraph(MiniCluster.java:969) > Jan 29 17:21:29 at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.lambda$testCheckpointStatsPersistedAcrossRescale$1(AdaptiveSchedulerClusterITCase.java:183) > Jan 29 17:21:29 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151) > Jan 29 17:21:29 at > org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145) > Jan 29 17:21:29 at > org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale(AdaptiveSchedulerClusterITCase.java:180) > Jan 29 17:21:29 at java.lang.reflect.Method.invoke(Method.java:498) > Jan 29 17:21:29 at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34323) Session window tvf failed when using named params
[ https://issues.apache.org/jira/browse/FLINK-34323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812698#comment-17812698 ] xuyang commented on FLINK-34323: I'll try to fix it. > Session window tvf failed when using named params > - > > Key: FLINK-34323 > URL: https://issues.apache.org/jira/browse/FLINK-34323 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Planner >Affects Versions: 1.19.0 >Reporter: xuyang >Priority: Major > Fix For: 1.19.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34323) Session window tvf failed when using named params
xuyang created FLINK-34323: -- Summary: Session window tvf failed when using named params Key: FLINK-34323 URL: https://issues.apache.org/jira/browse/FLINK-34323 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Planner Affects Versions: 1.19.0 Reporter: xuyang Fix For: 1.19.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34271) Fix the potential failure test about GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT
[ https://issues.apache.org/jira/browse/FLINK-34271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812664#comment-17812664 ] xuyang commented on FLINK-34271: [~dwysakowicz] I understand and agree with you. That's why I just work around my test. > Fix the potential failure test about > GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT > -- > > Key: FLINK-34271 > URL: https://issues.apache.org/jira/browse/FLINK-34271 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: xuyang >Assignee: xuyang >Priority: Major > Labels: pull-request-available > > The underlying reason is that a previous PR introduced a test with state TTL > as follows in the SQL: > {code:java} > .runSql( > "INSERT INTO sink_t SELECT /*+ STATE_TTL('source_t' = '4d') */" > + "b, " > + "COUNT(*) AS cnt, " > + "AVG(a) FILTER (WHERE a > 1) AS avg_a, " > + "MIN(c) AS min_c " > + "FROM source_t GROUP BY b"){code} > When the savepoint metadata was generated for the first time, the metadata > recorded the time when a certain key was accessed. If the test is rerun after > the TTL has expired, the state of this key in the metadata will be cleared, > resulting in an incorrect test outcome. > To rectify this issue, I think the current tests in RestoreTestBase could be > modified to regenerate a new savepoint metadata as needed every time. > However, this seems to deviate from the original design purpose of > RestoreTestBase. > For my test, I will work around this by removing the data > "consumedBeforeRestore", as I am only interested in testing the generation of > an expected JSON plan. -- This message was sent by Atlassian Jira (v8.20.10#820010)