[jira] [Commented] (FLINK-35905) Flink physical operator replacement support

2024-07-31 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869845#comment-17869845
 ] 

xuyang commented on FLINK-35905:


Yes, [~au_miner] you're right, currently Flink has not supported a plugin 
system for custom operators and rules. It is only a future plan... For more 
details, you can refer to the comments in 
[{{{}org.apache.flink.table.module.Module{}}}{_}{{}}{_}|https://github.com/apache/flink/blob/82b628d4730eef32b2f7a022e3b73cb18f950e6e/flink-table/flink-table-common/src/main/java/org/apache/flink/table/module/Module.java#L31]
 and 
[FLIP-68|https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Pluggable+Modules].
 

> Flink physical operator replacement support
> ---
>
> Key: FLINK-35905
> URL: https://issues.apache.org/jira/browse/FLINK-35905
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, Table SQL / API
>Affects Versions: 1.15.0
> Environment: Flink1.15 and so on
>Reporter: Wang Qilong
>Priority: Major
>
> Does Flinksql provide some SPI implementations that support custom physical 
> operators, such as customizing a new StreamExecFileSourceScan and supporting 
> rule injection for converting logical operators to physical operators



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35905) Flink physical operator replacement support

2024-07-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869486#comment-17869486
 ] 

xuyang commented on FLINK-35905:


Furthermore, I think this is not a bug, at most an improvement, right?

> Flink physical operator replacement support
> ---
>
> Key: FLINK-35905
> URL: https://issues.apache.org/jira/browse/FLINK-35905
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, Table SQL / API
>Affects Versions: 1.15.0
> Environment: Flink1.15 and so on
>Reporter: Wang Qilong
>Priority: Major
>
> I have been studying the FlinkSQL source code recently and have learned about 
> the execution process of FlinkSQL, which has led to a question:
> Does Flinksql provide some SPI implementations that support custom physical 
> operators, such as customizing a FileSourceScanExec in the execPhysicalPlan 
> layer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35905) Flink physical operator replacement support

2024-07-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869485#comment-17869485
 ] 

xuyang commented on FLINK-35905:


A short answer: No.

What is the reason and background for you to customize the exec node? Take 
FileSourceScanExec as an example, it is also just an ExecTableSourceScan with a 
connector parameter of file.

> Flink physical operator replacement support
> ---
>
> Key: FLINK-35905
> URL: https://issues.apache.org/jira/browse/FLINK-35905
> Project: Flink
>  Issue Type: Bug
>  Components: API / Scala, Table SQL / API
>Affects Versions: 1.15.0
> Environment: Flink1.15 and so on
>Reporter: Wang Qilong
>Priority: Major
>
> I have been studying the FlinkSQL source code recently and have learned about 
> the execution process of FlinkSQL, which has led to a question:
> Does Flinksql provide some SPI implementations that support custom physical 
> operators, such as customizing a FileSourceScanExec in the execPhysicalPlan 
> layer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35885) proctime aggregate window triggered by watermark

2024-07-25 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868659#comment-17868659
 ] 

xuyang edited comment on FLINK-35885 at 7/25/24 12:24 PM:
--

I think this bug needs to be fixed although it only affects the case where 
watermark is larger than proctime. 

I will propose a PR to fix it in the next few days.


was (Author: xuyangzhong):
I think this bug needs to be fixed although it only affects the case where 
watermark is larger than proctime. Not only window agg is affected, but window 
operators such as window rank also have this problem.

I will propose a PR to fix it in the next few days.

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Priority: Major
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35885) proctime aggregate window triggered by watermark

2024-07-25 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868659#comment-17868659
 ] 

xuyang commented on FLINK-35885:


I think this bug needs to be fixed although it only affects the case where 
watermark is larger than proctime. Not only window agg is affected, but window 
operators such as window rank also have this problem.

I will propose a PR to fix it in the next few days.

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Priority: Major
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35885) proctime aggregate window triggered by watermark

2024-07-25 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868658#comment-17868658
 ] 

xuyang commented on FLINK-35885:


The bug is caused by the advancement of the watermark and the proctime timer, 
both of which trigger the advance of the window buffer. When the window buffer 
advances, it records the time of the last advancement, so that when a new 
advancement point arrives that is smaller than this time, it does not need to 
flush the data again but can instead retrieve it directly from the state.

The bug occurs when a watermark (wt) that is larger than the proctime (pt) 
arrives first. This causes the window buffer to advance and flush the data in 
the buffer to the state. When the proctime is triggered by the timer, the 
window buffer assumes that the data for this time point (pt) has already been 
flushed to the state because pt < wt. However, in reality, the data in the 
state is incomplete (or possibly even empty).

 

 

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Priority: Major
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35861) The dependency of kafka sql connector seems wrong

2024-07-17 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866923#comment-17866923
 ] 

xuyang commented on FLINK-35861:


I'm not sure if other connectors have this bugs in document. cc 
[~loserwang1024].

> The dependency of kafka sql connector seems wrong
> -
>
> Key: FLINK-35861
> URL: https://issues.apache.org/jira/browse/FLINK-35861
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Documentation
>Affects Versions: 1.18.1, 1.19.1
>Reporter: xuyang
>Priority: Minor
>
> The dependency of it should be `flink-sql-connector-kafka` instead of 
> `flink-connector-kafka`, right?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35861) The dependency of kafka sql connector seems wrong

2024-07-17 Thread xuyang (Jira)
xuyang created FLINK-35861:
--

 Summary: The dependency of kafka sql connector seems wrong
 Key: FLINK-35861
 URL: https://issues.apache.org/jira/browse/FLINK-35861
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka, Documentation
Affects Versions: 1.19.1, 1.18.1
Reporter: xuyang


The dependency of it should be `flink-sql-connector-kafka` instead of 
`flink-connector-kafka`, right?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.

2024-07-15 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866220#comment-17866220
 ] 

xuyang commented on FLINK-35826:


I believe this is a more general problem, that is "how does an operator 
determine if a piece of data should be considered as late data".

Currently, we determine it on an operator level by comparing with the minimum 
of multiple input watermarks, but in a distributed system, the watermarks from 
multiple upstream operators or multiple parallelism of one upstream operator 
might be delayed.

One speculative possibility: could the data discarding be unified in 
WatermarkAssigner?

> [SQL] Sliding window may produce unstable calculations when processing 
> changelog data.
> --
>
> Key: FLINK-35826
> URL: https://issues.apache.org/jira/browse/FLINK-35826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
> Environment: flink with release-1.20
>Reporter: Yuan Kui
>Assignee: xuyang
>Priority: Major
> Attachments: image-2024-07-12-14-27-58-061.png
>
>
> Calculation results may be unstable when using a sliding window to process 
> changelog data. Repeat the execution 10 times, the test results are partial 
> success and partial failure:
> !image-2024-07-12-14-27-58-061.png!
> See the documentation and code for more details.
> [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing]
> code:
> [[BUG] Reproduce the issue of unstable sliding window calculation results · 
> yuchengxin/flink@c003e45 
> (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.

2024-07-15 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866217#comment-17866217
 ] 

xuyang commented on FLINK-35826:


cc [~catyee] [~yunta] 

> [SQL] Sliding window may produce unstable calculations when processing 
> changelog data.
> --
>
> Key: FLINK-35826
> URL: https://issues.apache.org/jira/browse/FLINK-35826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
> Environment: flink with release-1.20
>Reporter: Yuan Kui
>Assignee: xuyang
>Priority: Major
> Attachments: image-2024-07-12-14-27-58-061.png
>
>
> Calculation results may be unstable when using a sliding window to process 
> changelog data. Repeat the execution 10 times, the test results are partial 
> success and partial failure:
> !image-2024-07-12-14-27-58-061.png!
> See the documentation and code for more details.
> [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing]
> code:
> [[BUG] Reproduce the issue of unstable sliding window calculation results · 
> yuchengxin/flink@c003e45 
> (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.

2024-07-15 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17866216#comment-17866216
 ] 

xuyang commented on FLINK-35826:


Hi, let me try to explain the cause for this problem.

I have reproduced this problem by the following source data and query:

https://github.com/xuyangzhong/flink/tree/window_bug

 
{code:java}
// source data
// 
("+I", 1, 1, "2024-03-13T10:12", "1"),
("+I", 2, 3, "2024-03-13T10:16", "1"),
("+I", 3, 2, "2024-03-13T10:13", "1")

// schema
CREATE TABLE MyTable3 (
  id int,
  amount int,
  create_time timestamp(3),
  pt string,
  proc_time AS PROCTIME(),
  WATERMARK FOR `create_time` AS `create_time` - INTERVAL '0' MINUTES,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (...)

// query
select pt, 
  HOP_START(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS 
w_start,
  HOP_END(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS w_end,
  sum(amount) as count_age
  from MyTable3
group by HOP(create_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES), pt;

// expected result:
"1,2024-03-13T10:05,2024-03-13T10:15,1",
"1,2024-03-13T10:10,2024-03-13T10:20,6",
"1,2024-03-13T10:15,2024-03-13T10:25,3"

// but sometimes wrong:
"1,2024-03-13T10:05,2024-03-13T10:15,3",
"1,2024-03-13T10:10,2024-03-13T10:20,6",
"1,2024-03-13T10:15,2024-03-13T10:25,3" {code}
 

{color:#FF}*Conclusion:*{color}

The bug is caused by that the data `("+I", 3, 2, "2024-03-13T10:13", "1")` 
sometimes is not treated as a late record because of the watermark `create_time 
- INTERVAL '0' MINUTES`.

 

*Detailed cause:*

The plan: 
{code:java}
 parallelism is 4  
(t1 means subtask-1)source assigner(t1)  -> changelogNormalize (t1) 
-> group window agg(t1) 
   -> changelogNormalize (t2) 
-> group window agg(t2) 
   -> changelogNormalize (t3) 
-> group window agg(t3) 
       -> changelogNormalize (t4) 
-> group window agg(t4) {code}
Data accross changelogNormalize:

Data is simplified by , and Watermark is simplified to minutes.
| |t1|t2|t3|t4|
|data|<1,1>|-|-|-|
|watermark|12|12|12|12|
|data|-|<2, 3>|-|-|
|watermark|16|16{color:#FF}[1]{color}|16|16|
|data|-|-|<3, 2>|-|
|watermark|Long.MAX|Long.MAX|Long.MAX|Long.MAX|

[1]If we block the processing of watermark in ChangelogNormalize [1] for a 
little time,  <3, 2> will not been treated as a late record because the 
timestamp <3, 2> is 13(less than the watermark 12 in GroupWindowAgg )

Wrong data process in GroupWindowAgg:
|data or watermark|window's acc|output|
|<1, 1> // t1|[10, 15) : 1|-|
|Watermark 12 // t1, t2, t3, t4|[10, 15): 1|-|
|<2, 3> // t2|[10, 15): 1
[15, 20): 3|-|
|Watermark 16 // t1, t3, t4|[10, 15): 1
[15, 20): 3|-|
|<3, 2> // t3|[10, 15): 3
[15, 20): 3|-|
|{color:#FF}Watermark 16 // t2 (blocked){color}|[10, 15): 3
[15, 20): 3|[5, 15): 3|
|Watermark Long.MAX// t1,t2,t3, t4| |[10, 20): 6
[15, 25): 3|

 

> [SQL] Sliding window may produce unstable calculations when processing 
> changelog data.
> --
>
> Key: FLINK-35826
> URL: https://issues.apache.org/jira/browse/FLINK-35826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
> Environment: flink with release-1.20
>Reporter: Yuan Kui
>Assignee: xuyang
>Priority: Major
> Attachments: image-2024-07-12-14-27-58-061.png
>
>
> Calculation results may be unstable when using a sliding window to process 
> changelog data. Repeat the execution 10 times, the test results are partial 
> success and partial failure:
> !image-2024-07-12-14-27-58-061.png!
> See the documentation and code for more details.
> [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing]
> code:
> [[BUG] Reproduce the issue of unstable sliding window calculation results · 
> yuchengxin/flink@c003e45 
> (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35826) [SQL] Sliding window may produce unstable calculations when processing changelog data.

2024-07-12 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865412#comment-17865412
 ] 

xuyang commented on FLINK-35826:


[~martijnvisser] Thanks for this ping. I'll mark it and try to resolve it these 
days.

> [SQL] Sliding window may produce unstable calculations when processing 
> changelog data.
> --
>
> Key: FLINK-35826
> URL: https://issues.apache.org/jira/browse/FLINK-35826
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
> Environment: flink with release-1.20
>Reporter: Yuan Kui
>Priority: Major
> Attachments: image-2024-07-12-14-27-58-061.png
>
>
> Calculation results may be unstable when using a sliding window to process 
> changelog data. The test results are partial success and partial failure:
> !image-2024-07-12-14-27-58-061.png!
> See the documentation and code for more details.
> [https://docs.google.com/document/d/1JmwSLs4SJvZKe7kqALqVBZ-1F1OyPmiWw8J6Ug6vqW0/edit?usp=sharing]
> code:
> [[BUG] Reproduce the issue of unstable sliding window calculation results · 
> yuchengxin/flink@c003e45 
> (github.com)|https://github.com/yuchengxin/flink/commit/c003e45082e0d1464111c286ac9c7abb79527492]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35792) Sorting by proctime does not work in rank

2024-07-12 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865375#comment-17865375
 ] 

xuyang commented on FLINK-35792:


I'll try to fix it.

> Sorting by proctime does not work in rank
> -
>
> Key: FLINK-35792
> URL: https://issues.apache.org/jira/browse/FLINK-35792
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: xuyang
>Priority: Major
>
> Take the following sql as an example:
> {code:java}
> @Test
> def test(): Unit = {
>   val sql =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT a, b, c,
>   |  ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as 
> rank_num
>   |  FROM MyTable)
>   |WHERE rank_num = 1
> """.stripMargin
>   // This rank can't be converted into Deduplicated because it also uses `b`  
>  
>   // as order key.    
>   util.verifyExecPlan(sql)
> } {code}
> The rank node will not materialize the `proctime` in 
> `RelTimeIndicatorConverter`, thus the order key `proctime` is always null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35829) StreamPhysicalWindowTableFunction doesn't always require watermark

2024-07-12 Thread xuyang (Jira)
xuyang created FLINK-35829:
--

 Summary: StreamPhysicalWindowTableFunction doesn't always require 
watermark
 Key: FLINK-35829
 URL: https://issues.apache.org/jira/browse/FLINK-35829
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.19.1, 1.20.0
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35782) Flink connector jdbc works wrong when using sql gateway

2024-07-10 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864865#comment-17864865
 ] 

xuyang commented on FLINK-35782:


Hi, [~caiyi]. SqlGateway does not provide a driver by default. Can you confirm 
whether the corresponding drivers are included in the packages of these two 
connectors? If not, you can add the driver to SqlGateway by `add jar ...` and 
try again.

> Flink connector jdbc works wrong when using sql gateway
> ---
>
> Key: FLINK-35782
> URL: https://issues.apache.org/jira/browse/FLINK-35782
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / Gateway
>Affects Versions: 1.18.1, 1.19.1
>Reporter: Yi Cai
>Priority: Major
>
> When using sql clent to submit jobs to sql gateway will cause no suitable 
> driver found for x
>  
> script:
> add jar 's3://flink/lib/flink-connector-jdbc-3.1.2-1.19.jar';
> add jar 's3://flink/lib/mysql-connector-j-8.0.33.jar';
> CREATE CATALOG xxx WITH(
> ...
> );
> select ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35792) Sorting by proctime does not work in rank

2024-07-08 Thread xuyang (Jira)
xuyang created FLINK-35792:
--

 Summary: Sorting by proctime does not work in rank
 Key: FLINK-35792
 URL: https://issues.apache.org/jira/browse/FLINK-35792
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0, 1.20.0
Reporter: xuyang


Take the following sql as an example:
{code:java}
@Test
def test(): Unit = {
  val sql =
"""
  |SELECT *
  |FROM (
  |  SELECT a, b, c,
  |  ROW_NUMBER() OVER (PARTITION BY a ORDER BY b, proctime DESC) as 
rank_num
  |  FROM MyTable)
  |WHERE rank_num = 1
""".stripMargin

  // This rank can't be converted into Deduplicated because it also uses `b`   
  // as order key.    
  util.verifyExecPlan(sql)
} {code}
The rank node will not materialize the `proctime` in 
`RelTimeIndicatorConverter`, thus the order key `proctime` is always null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-27 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860506#comment-17860506
 ] 

xuyang edited comment on FLINK-20539 at 6/27/24 2:47 PM:
-

1. It looks like the original fix didn't fix it cleanly on the table api. Let 
me try to explain why the latest query failed briefly:
The type of the *cast* in the first *sqlQuery* is the wrong 
{*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary 
table through {*}createTemporaryView{*}, it is converted to flink's own type 
{*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to 
*PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is 
no longer consistent with the type of the original calcite tree.

({*}Row with {color:#ff}*FULLY_QUALIFIED in calcite*{color} -> RowType in 
flink -> Row with{*} {color:#ff}*PEEK_FIELDS_NO_EXPAND in 
calcite*{color}{*}{{*}})

 

_Too detailed to read:_

After executing sqlQuery, the *Row* type about *CAST* in the query statement 
has become the wrong FULLY_QUALIFIED. However, when executing 
{*}createTemporaryView{*}, we put the calcite tree into 
{*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into 
the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the 
*ResolvedSchema* in Flink, and store it in the catalog manager as a temporary 
table (i.e. {*}t1{*}).

When executing *sqlQuery* again, we need to convert the ResolvedSchema of the 
*t1* table into a type that can be recognized by calcite 
({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast 
type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type  in 
the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after 
flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug.

 

2. By the way, I tried the following query and found that there was no error, 
but there was a slight problem with the plan. (Although the same type of ITCase 
did not report an error)

 
{code:java}
@Test
  def test(): Unit = {
    util.addTable(s"""
                     |create table t1(
                     |  a int,
                     |  b varchar
                     |) with (
                     |  'connector' = 'datagen'
                     |)
       """.stripMargin)    util.verifyExecPlan(
      "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col 
FROM t1")
  } 

// actual wrong plan
Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, 
VARCHAR(2147483647) b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b])

// expected correct plan
Calc(select=[a, b, CAST(ROW(a, b) AS 
RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) 
b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b]){code}
 

 

Now I have determined the cause of the problem and how to fix it, and I am 
adding some cases and will create a pr later. Due to the inconsistency of row 
types in Calcite and Flink, I cannot enumerate all possible error cases in the 
future. If there are queries with the same error in the future, anyone can link 
to this jira and I will solve it then.


was (Author: xuyangzhong):
1. It looks like the original fix didn't fix it cleanly on the table api. Let 
me try to explain why the latest query failed briefly:
The type of the *cast* in the first *sqlQuery* is the wrong 
{*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary 
table through {*}createTemporaryView{*}, it is converted to flink's own type 
{*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to 
*PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is 
no longer consistent with the type of the original calcite tree.

({*}Row with {color:#FF}*FULLY_QUALIFIED in calcite*{color} -> RowType in 
flink -> Row with{*} {color:#FF}*PEEK_FIELDS_NO_EXPAND in 
calcite*{color}{*}{*})

 

_Too detailed to read:_

After executing sqlQuery, the *Row* type about *CAST* in the query statement 
has become the wrong FULLY_QUALIFIED. However, when executing 
{*}createTemporaryView{*}, we put the calcite tree into 
{*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into 
the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the 
*ResolvedSchema* in Flink, and store it in the catalog manager as a temporary 
table (i.e. {*}t1{*}).

When executing *sqlQuery* again, we need to convert the ResolvedSchema of the 
*t1* table into a type that can be recognized by calcite 
({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast 
type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type  in 
the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the 

[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-27 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17860506#comment-17860506
 ] 

xuyang commented on FLINK-20539:


1. It looks like the original fix didn't fix it cleanly on the table api. Let 
me try to explain why the latest query failed briefly:
The type of the *cast* in the first *sqlQuery* is the wrong 
{*}FULLY_QUALIFIED{*}; when the calcite tree stores in Flink as a temporary 
table through {*}createTemporaryView{*}, it is converted to flink's own type 
{*}RowType{*}; when executing *sqlQuery* again, flink *RowType* is converted to 
*PEEK_FIELDS_NO_EXPAND* when it is converted to calcite's {*}Row{*}, which is 
no longer consistent with the type of the original calcite tree.

({*}Row with {color:#FF}*FULLY_QUALIFIED in calcite*{color} -> RowType in 
flink -> Row with{*} {color:#FF}*PEEK_FIELDS_NO_EXPAND in 
calcite*{color}{*}{*})

 

_Too detailed to read:_

After executing sqlQuery, the *Row* type about *CAST* in the query statement 
has become the wrong FULLY_QUALIFIED. However, when executing 
{*}createTemporaryView{*}, we put the calcite tree into 
{*}PlannerQueryOperation{*}, and also convert the *FULLY_QUALIFIED* *ROW* into 
the *LogicalType* type in flink ({_}FlinkTypeFactory#toLogicalType{_}) as the 
*ResolvedSchema* in Flink, and store it in the catalog manager as a temporary 
table (i.e. {*}t1{*}).

When executing *sqlQuery* again, we need to convert the ResolvedSchema of the 
*t1* table into a type that can be recognized by calcite 
({_}FlinkTypeFactory#createFieldTypeFromLogicalType{_}). At this time, the cast 
type becomes {*}PEEK_FIELDS_NO_EXPAND{*}. The difference between the type  in 
the calcite tree ({*}FULLY_QUALIFIED{*}) and the type of the *t1* table after 
flink conversion ({*}PEEK_FIELDS_NO_EXPAND{*}) caused this bug.

 

2. By the way, I tried the following query and found that there was no error, 
but there was a slight problem with the plan. (Although the same type of ITCase 
did not report an error)

 
{code:java}
@Test
  def test(): Unit = {
    util.addTable(s"""
                     |create table t1(
                     |  a int,
                     |  b varchar,
                     |  c as row(a, b)
                     |) with (
                     |  'connector' = 'datagen'
                     |)
       """.stripMargin)    util.verifyExecPlan(
      "SELECT a, b, cast(row(a, b) as row(a_val string, b_val string)) as col 
FROM t1")
  } 

// actual wrong plan
Calc(select=[a, b, CAST(ROW(a, b) AS RecordType(VARCHAR(2147483647) a_val, 
VARCHAR(2147483647) b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b])

// expected correct plan
Calc(select=[a, b, CAST(ROW(a, b) AS 
RecordType:peek_no_expand(VARCHAR(2147483647) a_val, VARCHAR(2147483647) 
b_val)) AS col])
+- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b]){code}
 

 

Now I have determined the cause of the problem and how to fix it, and I am 
adding some cases and will create a pr later. Due to the inconsistency of row 
types in Calcite and Flink, I cannot enumerate all possible error cases in the 
future. If there are queries with the same error in the future, anyone can link 
to this jira and I will solve it then.

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, 

[jira] [Commented] (FLINK-20539) Type mismatch when using ROW in computed column

2024-06-24 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17859708#comment-17859708
 ] 

xuyang commented on FLINK-20539:


I'll take a look recently.

> Type mismatch when using ROW in computed column
> ---
>
> Key: FLINK-20539
> URL: https://issues.apache.org/jira/browse/FLINK-20539
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Timo Walther
>Assignee: xuyang
>Priority: Major
>  Labels: auto-unassigned, pull-request-available
> Fix For: 1.19.0, 1.18.2
>
>
> The following SQL:
> {code}
> env.executeSql(
>   "CREATE TABLE Orders (\n"
>   + "order_number BIGINT,\n"
>   + "priceINT,\n"
>   + "first_name   STRING,\n"
>   + "last_nameSTRING,\n"
>   + "buyer_name AS ROW(first_name, last_name)\n"
>   + ") WITH (\n"
>   + "  'connector' = 'datagen'\n"
>   + ")");
> env.executeSql("SELECT * FROM Orders").print();
> {code}
> Fails with:
> {code}
> Exception in thread "main" java.lang.AssertionError: Conversion to relational 
> algebra failed to preserve datatypes:
> validated type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE" EXPR$0, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT 
> NULL buyer_name) NOT NULL
> converted type:
> RecordType(BIGINT order_number, INTEGER price, VARCHAR(2147483647) CHARACTER 
> SET "UTF-16LE" first_name, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
> last_name, RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$0, 
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" EXPR$1) NOT NULL buyer_name) NOT 
> NULL
> rel:
> LogicalProject(order_number=[$0], price=[$1], first_name=[$2], 
> last_name=[$3], buyer_name=[ROW($2, $3)])
>   LogicalTableScan(table=[[default_catalog, default_database, Orders]])
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
>   at 
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35498) Unexpected argument name conflict error when do extract method params from udf

2024-06-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17852333#comment-17852333
 ] 

xuyang commented on FLINK-35498:


Copied from this available pr:

Currently, we depend on asm to extract method parameter names. We custom a 
MethodVisitor to visit local variable tables in file `{{{}.class`{}}}, and then 
cut the first `{{{}N`{}}} local variable names as the method parameter names.

However, if there are multi blocks about one local variable, the first 
`{{{}N`{}}} local variable names could be same, and then wrongly be extracted, 
and crashed when validating the conflict of them(the new logic added in 1.19).

This pr will use the slot index in `{{{}.class`{}}} file to extract the method 
parameter names, because method parameter names are always at the head in the 
'slot index' list([Chapter 3.6. Receiving 
Arguments|https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-3.html])

Take the test function 
`{{{}ExtractionUtilsTest#MultiLocalVariableBlocksWithoutInitializationClass`{}}}
 as an example:
 * Before fix:

 # extract parameter names from asm:
{{`[localVariable, localVariable, localVariable, this, generic, genericFuture, 
listOfGenericFuture, array, localVariable]`}}
 # get the first 4 after offset 1(expected {{this}} in index 0)
{{`[localVariable, localVariable, this, generic]`}}

This is because in local variable table, there are multi {{localVariable}} with 
different lifecycle in different blocks.
 * After fix:

 # extract parameter names from asm:
{{`[this, generic, genericFuture, listOfGenericFuture, array, localVariable]`}}
 # get the first 4 after offset 1(expected {{this}} in index 0)
{{`[generic, genericFuture, listOfGenericFuture, array]`}}

That's what we expected.

Further more, if the local variable has been initialized before `{{{}if 
statement`{}}}, its lifecycle is across all `{{{}if`{}}} blocks, and there is 
no need to init new local variable {{localVariable}} for it. Otherwize, 
`{{{}localVariable`{}}} in multi blocks are actually not visible although it is 
declared at first, and jvm will create multi `{{{}localVariable`{}}} for 
different blocks and add it in local variable table(You can see in local 
variable table, although these `{{{}localVariable{}}}`'s slots are same, but 
their start label and end label are different).

> Unexpected argument name conflict error when do extract method params from udf
> --
>
> Key: FLINK-35498
> URL: https://issues.apache.org/jira/browse/FLINK-35498
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0, 1.20.0
>Reporter: lincoln lee
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-06-02-23-09-17-768.png
>
>
> Follow the steps to reproduce the error:
> test case:
> {code:java}
> util.addTemporarySystemFunction("myudf", new TestXyz)
> util.tableEnv.explainSql("select myudf(f1, f2) from t")
> {code}
>  
> udf: TestXyz 
> {code:java}
> public class TestXyz extends ScalarFunction {
> public String eval(String s1, String s2) {
> // will not fail if add initialization
> String localV1;
> if (s1 == null) {
> if (s2 != null) {
> localV1 = s2;
> } else {
> localV1 = s2 + s1;
> }
> } else {
> if ("xx".equals(s2)) {
> localV1 = s1.length() >= s2.length() ? s1 : s2;
> } else {
> localV1 = s1;
> }
> }
> if (s1 == null) {
> return s2 + localV1;
> }
> if (s2 == null) {
> return s1;
> }
> return s1.length() >= s2.length() ? s1 + localV1 : s2;
> }
> }
> {code}
>  
> error stack:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Unable to extract 
> a type inference from method:
> public java.lang.String 
> org.apache.flink.table.planner.runtime.utils.TestXyz.eval(java.lang.String,java.lang.String)
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>     at 
> org.apache.flink.table.types.extraction.BaseMappingExtractor.extractResultMappings(BaseMappingExtractor.java:154)
>     at 
> org.apache.flink.table.types.extraction.BaseMappingExtractor.extractOutputMapping(BaseMappingExtractor.java:100)
>     ... 53 more
> Caused by: org.apache.flink.table.api.ValidationException: Argument name 
> conflict, there are at least two argument names that are the same.
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:362)
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:357)
>     at 
> org.apache.flink.table.types.extraction.FunctionSignatureTemplate.of(FunctionSignatureTemplate.java:73)
>     at 
> 

[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-24 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849209#comment-17849209
 ] 

xuyang commented on FLINK-34380:


Hi, [~rovboyko] . +1 to fix the wrong order. I'll take a look for your pr later.

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-14 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846485#comment-17846485
 ] 

xuyang edited comment on FLINK-34380 at 5/15/24 4:01 AM:
-

Sorry for this late reply. The results of this repair still don’t seem to meet 
expectations a little.

Still based on the above test, the result is following. However, the row kind 
of the first data should be `+I`, right?

 
{code:java}
++---++---++
| op | a | b  | a0| b0 |
++---++---++
| +U | 1 |  1 | 1 | 99 |
| -U | 1 |  1 | 1 | 99 |
| +U | 1 | 99 | 1 | 99 |
| -D | 1 | 99 | 1 | 99 |
++---++---++ {code}
 

 


was (Author: xuyangzhong):
Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can 
you take a look to verify it again?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-05-14 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846485#comment-17846485
 ] 

xuyang commented on FLINK-34380:


Sorry for this late reply. This commit for fix seems great. [~xu_shuai_] Can 
you take a look to verify it again?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35327) SQL Explain show push down condition

2024-05-09 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17845189#comment-17845189
 ] 

xuyang commented on FLINK-35327:


Hi, I'd like to take this Jira and take a look at it.

> SQL Explain show push down condition 
> -
>
> Key: FLINK-35327
> URL: https://issues.apache.org/jira/browse/FLINK-35327
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: Hongshun Wang
>Priority: Minor
> Fix For: 1.20.0
>
>
> Current, we can not determine whether filter/limit/partition condition is 
> pushed down to source. For example, we can only know filter condition is 
> pushed down if it is not included in Filter any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35193) Support drop materialized table syntax and execution in continuous refresh mode

2024-05-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844083#comment-17844083
 ] 

xuyang commented on FLINK-35193:


Hi, [~lsy] I'd like to take this task.

> Support drop materialized table syntax and execution in continuous refresh 
> mode
> ---
>
> Key: FLINK-35193
> URL: https://issues.apache.org/jira/browse/FLINK-35193
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> In continuous refresh mode, support drop materialized table and the 
> background refresh job.
> {code:SQL}
> DROP MATERIALIZED TABLE [ IF EXISTS ] [catalog_name.][db_name.]table_name
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35198) Support the execution of refresh materialized table

2024-05-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17844082#comment-17844082
 ] 

xuyang commented on FLINK-35198:


Hi, [~lsy] Can I take this jira?

> Support the execution of refresh materialized table
> ---
>
> Key: FLINK-35198
> URL: https://issues.apache.org/jira/browse/FLINK-35198
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:SQL}
> ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name REFRESH
>  [PARTITION (key1=val1, key2=val2, ...)]
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35271) Add doc for syntax `describe job 'xxx'`

2024-04-29 Thread xuyang (Jira)
xuyang created FLINK-35271:
--

 Summary: Add doc for syntax `describe job 'xxx'`
 Key: FLINK-35271
 URL: https://issues.apache.org/jira/browse/FLINK-35271
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.20.0
Reporter: xuyang
 Fix For: 1.20.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35194) Support describe job syntax and execution

2024-04-26 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17841113#comment-17841113
 ] 

xuyang commented on FLINK-35194:


Hi, can I take this jira?

> Support describe job syntax and execution
> -
>
> Key: FLINK-35194
> URL: https://issues.apache.org/jira/browse/FLINK-35194
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.20.0
>Reporter: dalongliu
>Priority: Major
> Fix For: 1.20.0
>
>
> {code:java}
> { DESCRIBE | DESC } JOB 'xxx'
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35081) CompileException when watermark definition contains coalesce and to_timestamp built-in functions

2024-04-17 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838424#comment-17838424
 ] 

xuyang commented on FLINK-35081:


I think this bug is same with FLINK-28693

> CompileException when watermark definition contains coalesce and to_timestamp 
> built-in functions
> 
>
> Key: FLINK-35081
> URL: https://issues.apache.org/jira/browse/FLINK-35081
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.17.1
>Reporter: Grzegorz Kołakowski
>Priority: Major
>
> I have a data stream in which event-time column can have two data formats. To 
> be able to define watermark on the table, I used coalesce and to_timestamp 
> built-in functions as shown below:
> {code:sql}
> create table test (
>     `@timestamp` VARCHAR,
>     __rowtime AS coalesce(
> to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss'),
> to_timestamp(`@timestamp`, '-MM-dd''T''HH:mm:ss.SSS')
> ),
>     watermark for __rowtime as __rowtime - INTERVAL '30' SECOND,
>     ...
> ) with ( ... )
> {code}
> The job failed with the following stacktrace:
> {noformat}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
>     at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown 
> Source)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
>     at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>     at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at akka.actor.Actor.aroundReceive(Actor.scala:537)
>     at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:547)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>     at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
>     at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
> Source)
>     at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
>     at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>     at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> Caused 

[jira] [Commented] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs

2024-04-17 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838130#comment-17838130
 ] 

xuyang commented on FLINK-34583:


Hi, [~xccui] can you provide more details about this bug? I try to run this 
test in my local env with Flink 1.18-SNAPSHOT, but could not re-produce it.
{code:java}
// run it in org.apache.flink.table.planner.plan.stream.sql.CalcTest
@Test
def test(): Unit = {
  util.tableEnv.executeSql(s"""
  |create temporary table T1 (
  |  a int,
  |  b int,
  |  c int)
  |  with ( 'connector' = 'values' )
  |""".stripMargin)
  util.verifyExecPlan(
"with q1 as (SELECT * FROM T1 /*+ OPTIONS('changelog-mode' = 'I,D') */ 
WHERE a > 10)," +
  "q2 as (SELECT a, b, c FROM q1 where b > 10)," +
  "q3 as (select a,b,c from q1 where c > 20)," +
  "q4 as (select * from q2 join q3 on q2.a = q3.a) SELECT * FROM q4");
}

// result
Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, c, a0, b0, c0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
:- Exchange(distribution=[hash[a]])
:  +- Calc(select=[a, b, c], where=[((a > 10) AND (b > 10))])
: +- TableSourceScan(table=[[default_catalog, default_database, T1, 
filter=[]]], fields=[a, b, c], hints=[[[OPTIONS 
options:{changelog-mode=I,D}]]])(reuse_id=[1])
+- Exchange(distribution=[hash[a]])
   +- Calc(select=[a, b, c], where=[((a > 10) AND (c > 20))])
  +- Reused(reference_id=[1]){code}

> Bug for dynamic table option hints with multiple CTEs
> -
>
> Key: FLINK-34583
> URL: https://issues.apache.org/jira/browse/FLINK-34583
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
>
> The table options hints don't work well with multiple WITH clauses referring 
> to the same table. Please see the following example.
>  
> The following query with hints works well.
> {code:java}
> SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code}
> The following query with multiple WITH clauses also works well.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...)
> SELECT * FROM T3;{code}
> The following query with multiple WITH clauses referring to the same original 
> table failed to recognize the hints.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...),
> T4 AS (SELECT ... FROM T2 WHERE...),
> T5 AS (SELECT ... FROM T3 JOIN T4 ON...)
> SELECT * FROM T5;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34583) Bug for dynamic table option hints with multiple CTEs

2024-03-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823973#comment-17823973
 ] 

xuyang commented on FLINK-34583:


[~lincoln.86xy]  Sure.

> Bug for dynamic table option hints with multiple CTEs
> -
>
> Key: FLINK-34583
> URL: https://issues.apache.org/jira/browse/FLINK-34583
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Priority: Major
>
> The table options hints don't work well with multiple WITH clauses referring 
> to the same table. Please see the following example.
>  
> The following query with hints works well.
> {code:java}
> SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...;{code}
> The following query with multiple WITH clauses also works well.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...)
> SELECT * FROM T3;{code}
> The following query with multiple WITH clauses referring to the same original 
> table failed to recognize the hints.
> {code:java}
> WITH T2 AS (SELECT * FROM T1 /*+ OPTIONS('foo' = 'bar') */ WHERE...),
> T3 AS (SELECT ... FROM T2 WHERE...),
> T4 AS (SELECT ... FROM T2 WHERE...),
> T5 AS (SELECT ... FROM T3 JOIN T4 ON...)
> SELECT * FROM T5;{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34559) TVF Window Aggregations might get stuck

2024-03-03 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823012#comment-17823012
 ] 

xuyang commented on FLINK-34559:


Hi, [~roman], I'm a little interested in how you would solve this problem. 

> limit the amount of data buffered in Global Aggregation nodes

IIUC, for non-session global window aggregation, it only stores the aggregated 
results in the state. I am a little curious about what buffering the data means 
here(maybe you mean input network buffer in operator level, right?).

> disable two-phase aggregations

Even if two-stage optimization is disabled, if the user's watermark interval is 
set very long, or the window is set very large, then in the global window 
aggregation node, the state will still be updated frequently.

As you mentioned, I believe it is now difficult to relate the availability of 
network buffers to the optimization of the plan, such as two-phase 
optimization. So, I'm somewhat looking forward to seeing your solution. :)

 

> TVF Window Aggregations might get stuck
> ---
>
> Key: FLINK-34559
> URL: https://issues.apache.org/jira/browse/FLINK-34559
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0, 1.18.1
>Reporter: Roman Khachatryan
>Assignee: Roman Khachatryan
>Priority: Major
> Fix For: 1.19.0
>
>
> RecordsWindowBuffer flushes buffered records in the following cases:
>  * watermark
>  * checkpoint barrier
>  * buffer overflow
>  
> In two-phase aggregations, this creates the following problems:
> 1) Local aggregation: enters hard-backpressure because for flush, it outputs 
> the data downstream and doesn't check network buffer availability
> This already disrupts normal checkpointing and watermarks progression
>  
> 2) Global aggregation: 
> When the window is large enough and/or the watermark is lagging, lots of data 
> is flushed to state backend (and the state is updated) in checkpoint SYNC 
> phase.
>  
> All this eventually causes checkpoint timeouts (10 minutes in our env).
>  
> Example query
> {code:java}
> INSERT INTO `target_table` 
> SELECT window_start, window_end, some, attributes, SUM(view_time) AS 
> total_view_time, COUNT(*) AS num, LISTAGG(DISTINCT page_url) AS pages 
> FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR($rowtime), INTERVAL '1' 
> HOUR)) 
> GROUP BY window_start, window_end, some, attributes;{code}
> In our setup, the issue can be reproduced deterministically.
>  
> As a quick fix, we might want to:
>  # limit the amount of data buffered in Global Aggregation nodes
>  # disable two-phase aggregations, i.e. Local Aggregations (we can try to 
> limit buffing there two, but network buffer availability can not be easily 
> checked from the operator)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-02-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822401#comment-17822401
 ] 

xuyang commented on FLINK-34380:


Hi,  [~xu_shuai_] . Can you help check it again?

> Strange RowKind and records about intermediate output when using minibatch 
> join
> ---
>
> Key: FLINK-34380
> URL: https://issues.apache.org/jira/browse/FLINK-34380
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> // Add it in CalcItCase
> @Test
>   def test(): Unit = {
> env.setParallelism(1)
> val rows = Seq(
>   changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
>   changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
>   changelogRow("-D", java.lang.Integer.valueOf(1), "99")
> )
> val dataId = TestValuesTableFactory.registerData(rows)
> val ddl =
>   s"""
>  |CREATE TABLE t1 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl)
> val ddl2 =
>   s"""
>  |CREATE TABLE t2 (
>  |  a int,
>  |  b string
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$dataId',
>  |  'bounded' = 'false'
>  |)
>""".stripMargin
> tEnv.executeSql(ddl2)
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
> tEnv.getConfig.getConfiguration
>   .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))
> println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = 
> t2.a").explain())
> tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
>   } {code}
> Output:
> {code:java}
> ++-+-+-+-+
> | op |           a |               b |          a0 |      b0 |
> ++-+-+-+-+
> | +U |           1 |               1 |           1 |      99 |
> | +U |           1 |              99 |           1 |      99 |
> | -U |           1 |               1 |           1 |      99 |
> | -D |           1 |              99 |           1 |      99 |
> ++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33989) Insert Statement With Filter Operation Generates Extra Tombstone using Upsert Kafka Connector

2024-02-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822337#comment-17822337
 ] 

xuyang commented on FLINK-33989:


Agree with [~libenchao] . This is a behavior by design.

> Insert Statement With Filter Operation Generates Extra Tombstone using Upsert 
> Kafka Connector
> -
>
> Key: FLINK-33989
> URL: https://issues.apache.org/jira/browse/FLINK-33989
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka, Table SQL / Runtime
>Affects Versions: 1.17.2
>Reporter: Flaviu Cicio
>Priority: Major
>
> Given the following Flink SQL tables:
> {code:sql}
> CREATE TABLE input (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'input', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> );
> CREATE TABLE output (
>   id STRING NOT NULL, 
>   current_value STRING NOT NULL, 
>   PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
>   'connector' = 'upsert-kafka', 
>   'topic' = 'output', 
>   'key.format' = 'raw', 
>   'properties.bootstrap.servers' = 'kafka:29092', 
>   'properties.group.id' = 'your_group_id', 
>   'value.format' = 'json'
> ); {code}
> And, the following entries are present in the input Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> If we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input; {code}
> The following entries are published to the output Kafka topic:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> But, if we execute the following statement:
> {code:sql}
> INSERT INTO output SELECT id, current_value FROM input WHERE id IN ('1'); 
> {code}
> The following entries are published:
> {code:json}
> [
>   {
>     "id": "1",
>     "current_value": "abc"
>   },
>   null,
>   {
>     "id": "1",
>     "current_value": "abcd"
>   }
> ]{code}
> We would expect the result to be the same for both insert statements.
> As we can see, there is an extra tombstone generated as a result of the 
> second statement.
>  
> Moreover, if we make a select on the input table:
> {code:sql}
> SELECT * FROM input;
> {code}
> We will get the following entries:
> ||op||id||current_value||
> |I|1|abc|
> |-U|1|abc|
> |+U|1|abcd|
> We expected to see only the insert and the update_after entries.
> The update_before is added at DeduplicateFunctionHelper#122.
> This is easily reproducible with this test that we added in the 
> UpsertKafkaTableITCase from flink-connector-kafka:
> {code:java}
> @Test
> public void testAggregateFilterOmit() throws Exception {
> String topic = COUNT_FILTER_TOPIC + "_" + format;
> createTestTopic(topic, 1, 1);
> env.setParallelism(1);
> // -   test   ---
> countFilterToUpsertKafkaOmitUpdateBefore(topic);
> // - clean up ---
> deleteTestTopic(topic);
> }
> private void countFilterToUpsertKafkaOmitUpdateBefore(String table) 
> throws Exception {
> String bootstraps = getBootstrapServers();
> List data =
> Arrays.asList(
> Row.of(1, "Hi"),
> Row.of(1, "Hello"),
> Row.of(2, "Hello world"),
> Row.of(2, "Hello world, how are you?"),
> Row.of(2, "I am fine."),
> Row.of(3, "Luke Skywalker"),
> Row.of(3, "Comment#1"),
> Row.of(3, "Comment#2"),
> Row.of(4, "Comment#3"),
> Row.of(4, null));
> final String createSource =
> String.format(
> "CREATE TABLE aggfilter_%s ("
> + "  `id` INT,\n"
> + "  `comment` STRING\n"
> + ") WITH ("
> + "  'connector' = 'values',"
> + "  'data-id' = '%s'"
> + ")",
> format, TestValuesTableFactory.registerData(data));
> tEnv.executeSql(createSource);
> final String createSinkTable =
> String.format(
> "CREATE TABLE %s (\n"
> + "  `id` INT,\n"
> + "  `comment` 

[jira] [Commented] (FLINK-34016) Janino compile failed when watermark with column by udf

2024-02-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822088#comment-17822088
 ] 

xuyang commented on FLINK-34016:


Thanks for your test! [~seb-pereira] (y)

> Janino compile failed when watermark with column by udf
> ---
>
> Key: FLINK-34016
> URL: https://issues.apache.org/jira/browse/FLINK-34016
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0, 1.18.0
>Reporter: ude
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-01-25-11-53-06-158.png, 
> image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, 
> image-2024-01-25-12-57-34-632.png
>
>
> After submit the following flink sql by sql-client.sh will throw an exception:
> {code:java}
> Caused by: java.lang.RuntimeException: Could not instantiate generated class 
> 'WatermarkGenerator$0'
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69)
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     ... 16 more
> Caused by: 
> org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 18 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
>     at 
> 

[jira] [Commented] (FLINK-34529) Projection cannot be pushed down through rank operator.

2024-02-29 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822064#comment-17822064
 ] 

xuyang commented on FLINK-34529:


Hi, [~nilerzhou] . The plan to fix bug LGTM.

IIUC, the fix is that when doing program about `LOGICAL_REWRITE`, the calc 
transpose the rank node by `CalcRankTransposeRule`. However, we don't have a 
rule to let the calc transpose the join node. The rule that lets calc transpose 
join is only in `PROJECT_REWRITE` and `LOGICAL`. One way to fix this bug is 
what you said, to add the rule `ProjectWindowTransposeRule` in `LOGICAL`. 
Another way to fix this bug is to add a rule like `CalcJoinTransposeRule` in 
`LOGICAL_REWRITE`. cc [~libenchao] 

> Projection cannot be pushed down through rank operator.
> ---
>
> Key: FLINK-34529
> URL: https://issues.apache.org/jira/browse/FLINK-34529
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: yisha zhou
>Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
> |SELECT a FROM (
> |  SELECT a, f,
> |  ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
> |  FROM  T1, T2
> |  WHERE T1.a = T2.d
> |)
> |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- Calc(select=[a, c])
> : +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- Calc(select=[d, f])
>   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>+- Exchange(distribution=[hash[f]])
>   +- Calc(select=[a, c, f])
>  +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
> :- Exchange(distribution=[hash[a]])
> :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[d]])
>+- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34473) Migrate FlinkPruneEmptyRules

2024-02-28 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821633#comment-17821633
 ] 

xuyang commented on FLINK-34473:


Hi, [~jackylau] . This Jira is a subtask instead of a bug, right?

> Migrate FlinkPruneEmptyRules
> 
>
> Key: FLINK-34473
> URL: https://issues.apache.org/jira/browse/FLINK-34473
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
> Fix For: 1.20.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34437) Typo in SQL Client - `s/succeed/succeeded`

2024-02-28 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821615#comment-17821615
 ] 

xuyang commented on FLINK-34437:


Hi, [~jingge] . I see that this PR has been merged. Does this Jira need to be 
closed?

> Typo in SQL Client - `s/succeed/succeeded`
> --
>
> Key: FLINK-34437
> URL: https://issues.apache.org/jira/browse/FLINK-34437
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.18.1
>Reporter: Robin Moffatt
>Priority: Not a Priority
>  Labels: pull-request-available
>
>  
> {code:java}
> Flink SQL> CREATE CATALOG c_new WITH ('type'='generic_in_memory');
> [INFO] Execute statement succeed. {code}
> `{*}Execute statement {color:#FF}succeed{color}.{*}` is grammatically 
> incorrect, and should read `{*}Execute statement 
> {color:#FF}succeeded{color}.{*}`
>  
> [https://github.com/apache/flink/blob/5844092408d21023a738077d0922cc75f1e634d7/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java#L214]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34517) environment configs ignored when calling procedure operation

2024-02-26 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820947#comment-17820947
 ] 

xuyang commented on FLINK-34517:


Hi, [~JustinLee] . Do you mean there are some configs lost when executing 
procedure in SqlGateway? IIUC, the table config used when calling procedure is 
from `ExecutableOperationContextImpl`, which is built in `OperationExecutor` 
with the logic following.
{code:java}
private TableConfig tableConfig() {
Configuration operationConfig = sessionContext.getSessionConf().clone();
operationConfig.addAll(executionConfig);

TableConfig tableConfig = TableConfig.getDefault();

tableConfig.setRootConfiguration(sessionContext.getDefaultContext().getFlinkConfig());
tableConfig.addConfiguration(operationConfig);

return tableConfig;
} {code}

> environment configs ignored when calling procedure operation
> 
>
> Key: FLINK-34517
> URL: https://issues.apache.org/jira/browse/FLINK-34517
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
>Reporter: JustinLee
>Assignee: JustinLee
>Priority: Major
>
> when calling procedure operation in Flink SQL, the ProcedureContext only 
> contains the underlying application-specific config , not 
> environment-specific config.
> to be more specific, in a  Flink sql app of the same 
> StreamExecutionEnvironment which has a config1. when executing a sql query, 
> config1 works, while calling a sql procedure, config1 doesn't work, which 
> apparently is not an expected behavior.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-26 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang closed FLINK-34378.
--
Resolution: Not A Problem

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> ++---+---+---+---+ 
> | op | a | b | a0| b0| 
> ++---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> ++---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> ++---+---+++
> | op | a | b | a0 | b0 |
> ++---+---+++
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> ++---+---+++
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-26 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17820924#comment-17820924
 ] 

xuyang commented on FLINK-34378:


Thanks for your answer, [~xu_shuai_]. That sounds good to me. I'll close this 
jira.

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> ++---+---+---+---+ 
> | op | a | b | a0| b0| 
> ++---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> ++---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> ++---+---+++
> | op | a | b | a0 | b0 |
> ++---+---+++
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> ++---+---+++
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-33489) LISTAGG with generating partial-final agg will cause wrong result

2024-02-07 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang resolved FLINK-33489.

Resolution: Resolved

> LISTAGG with generating partial-final agg will cause wrong result
> -
>
> Key: FLINK-33489
> URL: https://issues.apache.org/jira/browse/FLINK-33489
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.10.0, 1.11.0, 1.12.0, 1.13.0, 1.14.0, 1.15.0, 
> 1.16.0, 1.17.0, 1.18.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> Adding the following test cases in SplitAggregateITCase will reproduce this 
> bug:
>  
> {code:java}
> // code placeholder
> @Test
> def testListAggWithDistinctMultiArgs(): Unit = {
>   val t1 = tEnv.sqlQuery(s"""
> |SELECT
> |  a,
> |  LISTAGG(DISTINCT c, '#')
> |FROM T
> |GROUP BY a
>  """.stripMargin)
>   val sink = new TestingRetractSink
>   t1.toRetractStream[Row].addSink(sink)
>   env.execute()
>   val expected = Map[String, List[String]](
> "1" -> List("Hello 0", "Hello 1"),
> "2" -> List("Hello 0", "Hello 1", "Hello 2", "Hello 3", "Hello 4"),
> "3" -> List("Hello 0", "Hello 1"),
> "4" -> List("Hello 1", "Hello 2", "Hello 3")
>   )
>   val actualData = sink.getRetractResults.sorted
>   println(actualData)
> } {code}
> The `actualData` is `List(1,Hello 0,Hello 1, 2,Hello 2,Hello 4,Hello 3,Hello 
> 1,Hello 0, 3,Hello 1,Hello 0, 4,Hello 2,Hello 3,Hello 1)`, and the delimiter 
> `#` will be ignored.
> Let's take its plan:
> {code:java}
> // code placeholder
> LegacySink(name=[DataStreamTableSink], fields=[a, EXPR$1])
> +- GroupAggregate(groupBy=[a], partialFinalType=[FINAL], select=[a, 
> LISTAGG_RETRACT($f3_0) AS $f1])
>    +- Exchange(distribution=[hash[a]])
>       +- GroupAggregate(groupBy=[a, $f3, $f4], partialFinalType=[PARTIAL], 
> select=[a, $f3, $f4, LISTAGG(DISTINCT c, $f2) AS $f3_0])
>          +- Exchange(distribution=[hash[a, $f3, $f4]])
>             +- Calc(select=[a, c, _UTF-16LE'#' AS $f2, MOD(HASH_CODE(c), 
> 1024) AS $f3, MOD(HASH_CODE(_UTF-16LE'#'), 1024) AS $f4])
>                +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
>                   +- DataStreamScan(table=[[default_catalog, 
> default_database, T]], fields=[a, b, c]) {code}
> The final `GroupAggregate` missing the delimiter args, and the default 
> delimiter `,` will be used.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34348) Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input

2024-02-07 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815480#comment-17815480
 ] 

xuyang commented on FLINK-34348:


Hi, [~hackergin]. Thanks for your detailed testing. Overall, it seems that 
nothing is missing, regarding the plan test and IT test. 

> Release Testing: Verify FLINK-20281 Window aggregation supports changelog 
> stream input
> --
>
> Key: FLINK-34348
> URL: https://issues.apache.org/jira/browse/FLINK-34348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: Feng Jin
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
> Attachments: 截屏2024-02-07 16.21.37.png, 截屏2024-02-07 16.21.55.png, 
> 截屏2024-02-07 16.22.24.png, 截屏2024-02-07 16.23.12.png, 截屏2024-02-07 
> 16.23.27.png, 截屏2024-02-07 16.23.38.png, 截屏2024-02-07 16.29.09.png, 
> 截屏2024-02-07 16.29.21.png, 截屏2024-02-07 16.29.34.png, 截屏2024-02-07 
> 16.46.12.png, 截屏2024-02-07 16.46.23.png, 截屏2024-02-07 16.46.37.png, 
> 截屏2024-02-07 16.53.37.png, 截屏2024-02-07 16.53.47.png, 截屏2024-02-07 
> 16.54.01.png, 截屏2024-02-07 16.59.22.png, 截屏2024-02-07 16.59.33.png, 
> 截屏2024-02-07 16.59.42.png
>
>
> Window TVF aggregation supports changelog stream  is ready for testing. User 
> can add a window tvf aggregation as a down stream after CDC source or some 
> nodes that will produce cdc records.
> Someone can verify this feature with:
>  # Prepare a mysql table, and insert some data at first.
>  # Start sql-client and prepare ddl for this mysql table as a cdc source.
>  # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a 
> window aggregate node and the changelog contains "UA" or "UB" or "D" in its 
> upstream. 
>  # Use different kinds of window tvf to test window tvf aggregation while 
> updating the source data to check the data correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28693) Codegen failed if the watermark is defined on a columnByExpression

2024-02-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815141#comment-17815141
 ] 

xuyang commented on FLINK-28693:


This bug is caused by that the code generated by codegen references the class 
in the table-planner package, but the class in the table-planner package is 
hidden by table-planner-loader, so classloader cannot find it. 

I'll try to fix it.

> Codegen failed if the watermark is defined on a columnByExpression
> --
>
> Key: FLINK-28693
> URL: https://issues.apache.org/jira/browse/FLINK-28693
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.1
>Reporter: Hongbo
>Priority: Major
>
> The following code will throw an exception:
>  
> {code:java}
> Table program cannot be compiled. This is a bug. Please file an issue.
>  ...
>  Caused by: org.codehaus.commons.compiler.CompileException: Line 29, Column 
> 54: Cannot determine simple type name "org" {code}
> {color:#00}Code:{color}
> {code:java}
> public class TestUdf extends  ScalarFunction {
> @DataTypeHint("TIMESTAMP(3)")
> public LocalDateTime eval(String strDate) {
>return LocalDateTime.now();
> }
> }
> public class FlinkTest {
> @Test
> void testUdf() throws Exception {
> //var env = StreamExecutionEnvironment.createLocalEnvironment();
> // run `gradlew shadowJar` first to generate the uber jar.
> // It contains the kafka connector and a dummy UDF function.
> var env = 
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081,
> "build/libs/flink-test-all.jar");
> env.setParallelism(1);
> var tableEnv = StreamTableEnvironment.create(env);
> tableEnv.createTemporarySystemFunction("TEST_UDF", TestUdf.class);
> var testTable = tableEnv.from(TableDescriptor.forConnector("kafka")
> .schema(Schema.newBuilder()
> .column("time_stamp", DataTypes.STRING())
> .columnByExpression("udf_ts", "TEST_UDF(time_stamp)")
> .watermark("udf_ts", "udf_ts - INTERVAL '1' second")
> .build())
> // the kafka server doesn't need to exist. It fails in the 
> compile stage before fetching data.
> .option("properties.bootstrap.servers", "localhost:9092")
> .option("topic", "test_topic")
> .option("format", "json")
> .option("scan.startup.mode", "latest-offset")
> .build());
> testTable.printSchema();
> tableEnv.createTemporaryView("test", testTable );
> var query = tableEnv.sqlQuery("select * from test");
> var tableResult = 
> query.executeInsert(TableDescriptor.forConnector("print").build());
> tableResult.await();
> }
> }{code}
> What does the code do?
>  # read a stream from Kakfa
>  # create a derived column using an UDF expression
>  # define the watermark based on the derived column
> The full callstack:
>  
> {code:java}
> org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:62)
>  ~[flink-table-runtime-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:104)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:426)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:402)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:387)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist-1.15.1.jar:1.15.1]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)

[jira] [Commented] (FLINK-34016) Janino compile failed when watermark with column by udf

2024-02-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815140#comment-17815140
 ] 

xuyang commented on FLINK-34016:


Hi, [~wczhu] I have found the root bug and will try to fix it. You can 
temporarily replace flink-table-planner-loader with flink-table-planer in the 
opt/ folder just like https://issues.apache.org/jira/browse/FLINK-28693 said to 
work around this bug.

> Janino compile failed when watermark with column by udf
> ---
>
> Key: FLINK-34016
> URL: https://issues.apache.org/jira/browse/FLINK-34016
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.15.0, 1.18.0
>Reporter: ude
>Priority: Major
> Attachments: image-2024-01-25-11-53-06-158.png, 
> image-2024-01-25-11-54-54-381.png, image-2024-01-25-12-57-21-318.png, 
> image-2024-01-25-12-57-34-632.png
>
>
> After submit the following flink sql by sql-client.sh will throw an exception:
> {code:java}
> Caused by: java.lang.RuntimeException: Could not instantiate generated class 
> 'WatermarkGenerator$0'
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:74)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(GeneratedWatermarkGeneratorSupplier.java:69)
>     at 
> org.apache.flink.streaming.api.operators.source.ProgressiveTimestampsAndWatermarks.createMainOutput(ProgressiveTimestampsAndWatermarks.java:109)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.initializeMainOutput(SourceOperator.java:462)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNextNotReading(SourceOperator.java:438)
>     at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:414)
>     at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:101)
>     at 
> org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:68)
>     ... 16 more
> Caused by: 
> org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>     ... 18 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>     at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
>     at 
> 

[jira] [Commented] (FLINK-34211) Filtering on Column names with ?s fails for JDBC lookup join.

2024-02-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815118#comment-17815118
 ] 

xuyang commented on FLINK-34211:


Hi, [~davidradl] this Jira looks like an improvement, not a bug, right?

> Filtering on Column names with ?s fails for JDBC lookup join. 
> --
>
> Key: FLINK-34211
> URL: https://issues.apache.org/jira/browse/FLINK-34211
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / JDBC, Table SQL / JDBC
>Reporter: david radley
>Priority: Minor
>
> There is a check for ? character in 
> [https://github.com/apache/flink-connector-jdbc/blob/e3dd84160cd665ae17672da8b6e742e61a72a32d/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java#L186
>  |FieldNamedPreparedStatementImpl.java]
> Removing this check allows column names containing _?_ 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34376) FLINK SQL SUM() causes a precision error

2024-02-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815091#comment-17815091
 ] 

xuyang commented on FLINK-34376:


This bug is introduced by https://issues.apache.org/jira/browse/FLINK-22586 .

We can simplify the sql to re-produce this bug: 
{code:java}
select cast(9.11 AS DECIMAL(38,18)) * 10

++--+
| op |                                   EXPR$0 |
++--+
| +I |                               90.000 |
++--+ {code}
This is a designed behavior(but there seems to be some problems). For the 
multiplication of Decimal types, the following formula is currently used.

 
{code:java}
// = Decimal Precision Deriving 
==
// Adopted from 
"https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-
// scale-and-length-transact-sql"
//
// OperationResult PrecisionResult Scale
// e1 + e2  max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2)
// e1 - e2  max(s1, s2) + max(p1-s1, p2-s2) + 1 max(s1, s2)
// e1 * e2  p1 + p2 + 1 s1 + s2
// e1 / e2  p1 - s1 + s2 + max(6, s1 + p2 + 1)  max(6, s1 + p2 + 1)
// e1 % e2  min(p1-s1, p2-s2) + max(s1, s2) max(s1, s2)
//
// Also, if the precision / scale are out of the range, the scale may be 
sacrificed
// in order to prevent the truncation of the integer part of the decimals. 
{code}
 

For Integer type, the default precision is 10 and the scale is 0. So the result 
precision and scale is (49, 18). However, the precision exceeds the max 
precision 38, then it chooses to adjust scale from 18 to 7:

 
{code:java}
integer part: 49 - 18 = 31
adjusted scale: 38 - 31 = 7{code}
IMO, the original design that choose to keep the integer part of the completion 
makes sense. But in this case, the result is wrong and we should fix it (by 
verifying mysql the result is `90.000110`).

 

 

> FLINK SQL SUM() causes a precision error
> 
>
> Key: FLINK-34376
> URL: https://issues.apache.org/jira/browse/FLINK-34376
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3, 1.18.1
>Reporter: Fangliang Liu
>Priority: Major
> Attachments: image-2024-02-06-11-15-02-669.png, 
> image-2024-02-06-11-17-03-399.png
>
>
> {code:java}
> select cast(sum(CAST(9.11 AS DECIMAL(38,18)) *10 ) as STRING) 
> {code}
> The precision is wrong in the Flink 1.14.3 and master branch
> !image-2024-02-06-11-15-02-669.png!
>  
> The accuracy is correct in the Flink 1.13.2 
> !image-2024-02-06-11-17-03-399.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17815054#comment-17815054
 ] 

xuyang commented on FLINK-34378:


[~lsy] The situation is that although I set the parallelism is "1", but the 
order of output in minibatch is still disrupted.

Hi, [~libenchao] . Thanks for reminding, I have attached the diff about results 
while tuning on and off the minibatch join.

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}
> Result
> {code:java}
> ++---+---+---+---+ 
> | op | a | b | a0| b0| 
> ++---+---+---+---+ 
> | +I | 3 | 3 | 3 | 3 | 
> | +I | 7 | 7 | 7 | 7 | 
> | +I | 2 | 2 | 2 | 2 | 
> | +I | 5 | 5 | 5 | 5 | 
> | +I | 1 | 1 | 1 | 1 | 
> | +I | 6 | 6 | 6 | 6 | 
> | +I | 4 | 4 | 4 | 4 | 
> | +I | 8 | 8 | 8 | 8 | 
> ++---+---+---+---+
> {code}
> When I do not use minibatch join, the result is :
> {code:java}
> ++---+---+++
> | op | a | b | a0 | b0 |
> ++---+---+++
> | +I | 1 | 1 |  1 |  1 |
> | +I | 2 | 2 |  2 |  2 |
> | +I | 3 | 3 |  3 |  3 |
> | +I | 4 | 4 |  4 |  4 |
> | +I | 5 | 5 |  5 |  5 |
> | +I | 6 | 6 |  6 |  6 |
> | +I | 7 | 7 |  7 |  7 |
> | +I | 8 | 8 |  8 |  8 |
> ++---+---+++
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-06 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Description: 
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}
++---+---+---+---+ 
| op | a | b | a0| b0| 
++---+---+---+---+ 
| +I | 3 | 3 | 3 | 3 | 
| +I | 7 | 7 | 7 | 7 | 
| +I | 2 | 2 | 2 | 2 | 
| +I | 5 | 5 | 5 | 5 | 
| +I | 1 | 1 | 1 | 1 | 
| +I | 6 | 6 | 6 | 6 | 
| +I | 4 | 4 | 4 | 4 | 
| +I | 8 | 8 | 8 | 8 | 
++---+---+---+---+

{code}
When I do not use minibatch join, the result is :
{code:java}
++---+---+++
| op | a | b | a0 | b0 |
++---+---+++
| +I | 1 | 1 |  1 |  1 |
| +I | 2 | 2 |  2 |  2 |
| +I | 3 | 3 |  3 |  3 |
| +I | 4 | 4 |  4 |  4 |
| +I | 5 | 5 |  5 |  5 |
| +I | 6 | 6 |  6 |  6 |
| +I | 7 | 7 |  7 |  7 |
| +I | 8 | 8 |  8 |  8 |
++---+---+++
 {code}
 

  was:
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}

++---+---+---+---+ 
| op | a | b | a0| b0| 
++---+---+---+---+ 
| +I | 3 | 3 | 3 | 3 | 
| +I | 7 | 7 | 7 | 7 | 
| +I | 2 | 2 | 2 | 2 | 
| +I | 5 | 5 | 5 | 5 | 
| +I | 1 | 1 | 1 | 1 | 
| +I | 6 | 6 | 6 | 6 | 
| +I | 4 | 4 | 4 | 4 | 
| +I | 8 | 8 | 8 | 8 | 
++---+---+---+---+

{code}
 

 


> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  

[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-06 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Description: 
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}

++---+---+---+---+ 
| op | a | b | a0| b0| 
++---+---+---+---+ 
| +I | 3 | 3 | 3 | 3 | 
| +I | 7 | 7 | 7 | 7 | 
| +I | 2 | 2 | 2 | 2 | 
| +I | 5 | 5 | 5 | 5 | 
| +I | 1 | 1 | 1 | 1 | 
| +I | 6 | 6 | 6 | 6 | 
| +I | 4 | 4 | 4 | 4 | 
| +I | 8 | 8 | 8 | 8 | 
++---+---+---+---+

{code}
 

 

  was:
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}
// code placeholder
{code}
 

 


> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> 

[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-06 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Description: 
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}
Result
{code:java}
// code placeholder
{code}
 

 

  was:
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}


> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}

[jira] [Commented] (FLINK-34381) `RelDataType#getFullTypeString` should be used to print in `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString`

2024-02-06 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814700#comment-17814700
 ] 

xuyang commented on FLINK-34381:


I will try to fix it if this makes sense indeed.

> `RelDataType#getFullTypeString` should be used to print in 
> `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString`
> 
>
> Key: FLINK-34381
> URL: https://issues.apache.org/jira/browse/FLINK-34381
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Planner
>Affects Versions: 1.9.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> Currently `RelTreeWriterImpl` use `rel.getRowType.toString` to print row type.
> {code:java}
> if (withRowType) {
>   s.append(", rowType=[").append(rel.getRowType.toString).append("]")
> } {code}
> However, looking deeper into the code, we should use 
> `rel.getRowType.getFullTypeString` to print the row type. Because the 
> function `getFullTypeString` will print richer type information such as 
> `nullable`. Take `StructuredRelDataType` as an example, the diff is below:
> {code:java}
> // source
> util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)
> // sql
> SELECT a, c FROM MyTable
> // rel.getRowType.toString
> RecordType(BIGINT a, VARCHAR(2147483647) c)
> // rel.getRowType.getFullTypeString
> RecordType(BIGINT a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" c) NOT 
> NULL{code}
>    



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`

2024-02-06 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34372:
---
Attachment: image-2024-02-06-16-38-24-085.png

> Complete work for syntax `DESCRIBE CATALOG catalogName`
> ---
>
> Key: FLINK-34372
> URL: https://issues.apache.org/jira/browse/FLINK-34372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
> Attachments: image-2024-02-06-16-38-24-085.png
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`

2024-02-06 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34372:
---
Description: 
The public api changes are announced in Flip-69 
([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]).
 

The result type of this query is also defined in 
Flip-69([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]
 ) and 
Flip-84([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878]).
 Here I just show them again.

 
{code:java}
// Catalog

/**
* Get a user defined catalog description.
* @return a user-implement catalog detailed explanation
*/
default String explainCatalog() {
    return String.format("CatalogClass:%s", this.getClass().getCanonicalName());
} {code}
!image-2024-02-06-16-38-24-085.png|width=751,height=343!

 

> Complete work for syntax `DESCRIBE CATALOG catalogName`
> ---
>
> Key: FLINK-34372
> URL: https://issues.apache.org/jira/browse/FLINK-34372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
> Attachments: image-2024-02-06-16-38-24-085.png
>
>
> The public api changes are announced in Flip-69 
> ([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]).
>  
> The result type of this query is also defined in 
> Flip-69([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement]
>  ) and 
> Flip-84([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878]).
>  Here I just show them again.
>  
> {code:java}
> // Catalog
> /**
> * Get a user defined catalog description.
> * @return a user-implement catalog detailed explanation
> */
> default String explainCatalog() {
>     return String.format("CatalogClass:%s", 
> this.getClass().getCanonicalName());
> } {code}
> !image-2024-02-06-16-38-24-085.png|width=751,height=343!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34373) Complete work for syntax `DESCRIBE DATABASE databaseName`

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814636#comment-17814636
 ] 

xuyang commented on FLINK-34373:


I'll try to fix it.

> Complete work for syntax `DESCRIBE DATABASE databaseName`
> -
>
> Key: FLINK-34373
> URL: https://issues.apache.org/jira/browse/FLINK-34373
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34381) `RelDataType#getFullTypeString` should be used to print in `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString`

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34381:
--

 Summary: `RelDataType#getFullTypeString` should be used to print 
in `RelTreeWriterImpl` if `withRowType` is true instead of `Object#toString`
 Key: FLINK-34381
 URL: https://issues.apache.org/jira/browse/FLINK-34381
 Project: Flink
  Issue Type: Technical Debt
  Components: Table SQL / Planner
Affects Versions: 1.9.0, 1.19.0
Reporter: xuyang


Currently `RelTreeWriterImpl` use `rel.getRowType.toString` to print row type.
{code:java}
if (withRowType) {
  s.append(", rowType=[").append(rel.getRowType.toString).append("]")
} {code}
However, looking deeper into the code, we should use 
`rel.getRowType.getFullTypeString` to print the row type. Because the function 
`getFullTypeString` will print richer type information such as `nullable`. Take 
`StructuredRelDataType` as an example, the diff is below:
{code:java}
// source
util.addTableSource[(Long, Int, String)]("MyTable", 'a, 'b, 'c)

// sql
SELECT a, c FROM MyTable

// rel.getRowType.toString
RecordType(BIGINT a, VARCHAR(2147483647) c)

// rel.getRowType.getFullTypeString
RecordType(BIGINT a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" c) NOT 
NULL{code}
   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814623#comment-17814623
 ] 

xuyang commented on FLINK-34370:


[~337361...@qq.com] Thanks for your volunteering.

> [Umbrella] Complete work and improve about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
>  about enhanced Flink SQL DDL.
> With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
> that this flip is not finished yet.
> The matrix is below:
> |DDL|can be used in sql-client |
> |_SHOW CATALOGS_|YES|
> |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
> |_USE_ _CATALOG catalogName_|YES|
> |_CREATE DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName_|YES|
> |_DROP IF EXISTS DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName RESTRICT_|YES|
> |_DROP DATABASE dataBaseName CASCADE_|YES|
> |_ALTER DATABASE dataBaseName SET
> ( name=value [, name=value]*)|YES|
> |_USE dataBaseName_|YES|
> |_SHOW_ _DATABASES_|YES|
> |_DESCRIBE DATABASE dataBaseName_|{color:#de350b}NO{color}|
> |_DESCRIBE EXTENDED DATABASE dataBaseName_|{color:#de350b}NO{color}|
> |_SHOW_ _TABLES_|YES|
> |_DESCRIBE tableName_|YES|
> |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
> |_ALTER_ _TABLE tableName
> RENAME TO newTableName|YES|
> |_ALTER_ _TABLE tableName
> SET ( name=value [, name=value]*)|YES|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34380) Strange RowKind and records about intermediate output when using minibatch join

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34380:
--

 Summary: Strange RowKind and records about intermediate output 
when using minibatch join
 Key: FLINK-34380
 URL: https://issues.apache.org/jira/browse/FLINK-34380
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.19.0
Reporter: xuyang
 Fix For: 1.19.0


{code:java}
// Add it in CalcItCase

@Test
  def test(): Unit = {
env.setParallelism(1)
val rows = Seq(
  changelogRow("+I", java.lang.Integer.valueOf(1), "1"),
  changelogRow("-U", java.lang.Integer.valueOf(1), "1"),
  changelogRow("+U", java.lang.Integer.valueOf(1), "99"),
  changelogRow("-D", java.lang.Integer.valueOf(1), "99")
)
val dataId = TestValuesTableFactory.registerData(rows)

val ddl =
  s"""
 |CREATE TABLE t1 (
 |  a int,
 |  b string
 |) WITH (
 |  'connector' = 'values',
 |  'data-id' = '$dataId',
 |  'bounded' = 'false'
 |)
   """.stripMargin
tEnv.executeSql(ddl)

val ddl2 =
  s"""
 |CREATE TABLE t2 (
 |  a int,
 |  b string
 |) WITH (
 |  'connector' = 'values',
 |  'data-id' = '$dataId',
 |  'bounded' = 'false'
 |)
   """.stripMargin
tEnv.executeSql(ddl2)

tEnv.getConfig.getConfiguration
  .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
Boolean.box(true))
tEnv.getConfig.getConfiguration
  .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
tEnv.getConfig.getConfiguration
  .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(3L))

println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
  } {code}
Output:
{code:java}
++-+-+-+-+
| op |           a |               b |          a0 |      b0 |
++-+-+-+-+
| +U |           1 |               1 |           1 |      99 |
| +U |           1 |              99 |           1 |      99 |
| -U |           1 |               1 |           1 |      99 |
| -D |           1 |              99 |           1 |      99 |
++-+-+-+-+{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814612#comment-17814612
 ] 

xuyang commented on FLINK-34349:


Hi, I have finished this verification. Some unexpected behaviors such as bug 
and tech doubt have been linked to this jira.

> Release Testing: Verify FLINK-34219 Introduce a new join operator to support 
> minibatch
> --
>
> Key: FLINK-34349
> URL: https://issues.apache.org/jira/browse/FLINK-34349
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Shuai Xu
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Minibatch join is ready. Users could improve performance in regular stream 
> join scenarios. 
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24240)] although it is still 
> being reviewed.
> If someone finds some bugs about this feature, you open a Jira linked this 
> one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Affects Version/s: 1.19.0

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Fix Version/s: 1.19.0

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Component/s: Table SQL / Runtime

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / Runtime
>Reporter: xuyang
>Priority: Major
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814590#comment-17814590
 ] 

xuyang commented on FLINK-34378:


cc [~xu_shuai_] 

> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: xuyang
>Priority: Major
>
> I'm not sure if it's a bug, the following case can re-produce this bug.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34378:
---
Description: 
I'm not sure if it's a bug. The following case can re-produce this situation.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}

  was:
I'm not sure if it's a bug, the following case can re-produce this bug.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}


> Minibatch join disrupted the original order of input records
> 
>
> Key: FLINK-34378
> URL: https://issues.apache.org/jira/browse/FLINK-34378
> Project: Flink
>  Issue Type: Technical Debt
>Reporter: xuyang
>Priority: Major
>
> I'm not sure if it's a bug. The following case can re-produce this situation.
> {code:java}
> // add it in CalcITCase
> @Test
> def test(): Unit = {
>   env.setParallelism(1)
>   val rows = Seq(
> row(1, "1"),
> row(2, "2"),
> row(3, "3"),
> row(4, "4"),
> row(5, "5"),
> row(6, "6"),
> row(7, "7"),
> row(8, "8"))
>   val dataId = TestValuesTableFactory.registerData(rows)
>   val ddl =
> s"""
>|CREATE TABLE t1 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl)
>   val ddl2 =
> s"""
>|CREATE TABLE t2 (
>|  a int,
>|  b string
>|) WITH (
>|  'connector' = 'values',
>|  'data-id' = '$dataId',
>|  'bounded' = 'false'
>|)
>  """.stripMargin
>   tEnv.executeSql(ddl2)
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, 
> Boolean.box(true))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
> Duration.ofSeconds(5))
>   tEnv.getConfig.getConfiguration
> .set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))
>   println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())
>   tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34378) Minibatch join disrupted the original order of input records

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34378:
--

 Summary: Minibatch join disrupted the original order of input 
records
 Key: FLINK-34378
 URL: https://issues.apache.org/jira/browse/FLINK-34378
 Project: Flink
  Issue Type: Technical Debt
Reporter: xuyang


I'm not sure if it's a bug, the following case can re-produce this bug.
{code:java}
// add it in CalcITCase
@Test
def test(): Unit = {
  env.setParallelism(1)
  val rows = Seq(
row(1, "1"),
row(2, "2"),
row(3, "3"),
row(4, "4"),
row(5, "5"),
row(6, "6"),
row(7, "7"),
row(8, "8"))
  val dataId = TestValuesTableFactory.registerData(rows)

  val ddl =
s"""
   |CREATE TABLE t1 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl)

  val ddl2 =
s"""
   |CREATE TABLE t2 (
   |  a int,
   |  b string
   |) WITH (
   |  'connector' = 'values',
   |  'data-id' = '$dataId',
   |  'bounded' = 'false'
   |)
 """.stripMargin
  tEnv.executeSql(ddl2)

  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED, Boolean.box(true))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(5))
  tEnv.getConfig.getConfiguration
.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, Long.box(20L))

  println(tEnv.sqlQuery("SELECT * from t1 join t2 on t1.a = t2.a").explain())

  tEnv.executeSql("SELECT * from t1 join t2 on t1.a = t2.a").print()
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814584#comment-17814584
 ] 

xuyang commented on FLINK-34372:


I'll try to do it.

> Complete work for syntax `DESCRIBE CATALOG catalogName`
> ---
>
> Key: FLINK-34372
> URL: https://issues.apache.org/jira/browse/FLINK-34372
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34375) Complete work for syntax `DESCRIBE EXTENDED tableName`

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34375:
--

 Summary: Complete work for syntax `DESCRIBE EXTENDED tableName`
 Key: FLINK-34375
 URL: https://issues.apache.org/jira/browse/FLINK-34375
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.10.0, 1.19.0
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34374) Complete work for syntax `DESCRIBE EXTENDED DATABASE databaseName`

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34374:
--

 Summary: Complete work for syntax `DESCRIBE EXTENDED DATABASE 
databaseName`
 Key: FLINK-34374
 URL: https://issues.apache.org/jira/browse/FLINK-34374
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.10.0, 1.19.0
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34370:
---
Description: 
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The matrix is below:
|DDL|can be used in sql-client |
|_SHOW CATALOGS_|YES|
|_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
|_USE_ _CATALOG catalogName_|YES|
|_CREATE DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName_|YES|
|_DROP IF EXISTS DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName RESTRICT_|YES|
|_DROP DATABASE dataBaseName CASCADE_|YES|
|_ALTER DATABASE dataBaseName SET
( name=value [, name=value]*)|YES|
|_USE dataBaseName_|YES|
|_SHOW_ _DATABASES_|YES|
|_DESCRIBE DATABASE dataBaseName_|{color:#de350b}NO{color}|
|_DESCRIBE EXTENDED DATABASE dataBaseName_|{color:#de350b}NO{color}|
|_SHOW_ _TABLES_|YES|
|_DESCRIBE tableName_|YES|
|_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
|_ALTER_ _TABLE tableName
RENAME TO newTableName|YES|
|_ALTER_ _TABLE tableName
SET ( name=value [, name=value]*)|YES|

 

  was:
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The matrix is below:
|DDL|can be used in sql-client |
|_SHOW CATALOGS_|YES|
|_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
|_USE_ _CATALOG catalogName_|YES|
|_CREATE DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName_|YES|
|_DROP IF EXISTS DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName RESTRICT_|YES|
|_DROP DATABASE dataBaseName CASCADE_|YES|
|_ALTER DATABASE dataBaseName SET
( name=value [, name=value]*)|YES|
|_USE dataBaseName_|YES|
|_SHOW_ _DATABASES_|YES|
|_DESCRIBE DATABASE dataBasesName_|{color:#de350b}NO{color}|
|_DESCRIBE EXTENDED DATABASE dataBasesName_|{color:#de350b}NO{color}|
|_SHOW_ _TABLES_|YES|
|_DESCRIBE tableName_|YES|
|_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
|_ALTER_ _TABLE tableName
RENAME TO newTableName|YES|
|_ALTER_ _TABLE tableName
SET ( name=value [, name=value]*)|YES|

 


> [Umbrella] Complete work and improve about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
>  about enhanced Flink SQL DDL.
> With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
> that this flip is not finished yet.
> The matrix is below:
> |DDL|can be used in sql-client |
> |_SHOW CATALOGS_|YES|
> |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
> |_USE_ _CATALOG catalogName_|YES|
> |_CREATE DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName_|YES|
> |_DROP IF EXISTS DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName RESTRICT_|YES|
> |_DROP DATABASE dataBaseName CASCADE_|YES|
> |_ALTER DATABASE dataBaseName SET
> ( name=value [, name=value]*)|YES|
> |_USE dataBaseName_|YES|
> |_SHOW_ _DATABASES_|YES|
> |_DESCRIBE DATABASE dataBaseName_|{color:#de350b}NO{color}|
> |_DESCRIBE EXTENDED DATABASE dataBaseName_|{color:#de350b}NO{color}|
> |_SHOW_ _TABLES_|YES|
> |_DESCRIBE tableName_|YES|
> |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
> |_ALTER_ _TABLE tableName
> RENAME TO newTableName|YES|
> |_ALTER_ _TABLE tableName
> SET ( name=value [, name=value]*)|YES|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34373) Complete work for syntax `DESCRIBE DATABASE databaseName`

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34373:
--

 Summary: Complete work for syntax `DESCRIBE DATABASE databaseName`
 Key: FLINK-34373
 URL: https://issues.apache.org/jira/browse/FLINK-34373
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.10.0, 1.19.0
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34370:
---
Description: 
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The matrix is below:
|DDL|can be used in sql-client |
|_SHOW CATALOGS_|YES|
|_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
|_USE_ _CATALOG catalogName_|YES|
|_CREATE DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName_|YES|
|_DROP IF EXISTS DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName RESTRICT_|YES|
|_DROP DATABASE dataBaseName CASCADE_|YES|
|_ALTER DATABASE dataBaseName SET
( name=value [, name=value]*)|YES|
|_USE dataBaseName_|YES|
|_SHOW_ _DATABASES_|YES|
|_DESCRIBE DATABASE dataBasesName_|{color:#de350b}NO{color}|
|_DESCRIBE EXTENDED DATABASE dataBasesName_|{color:#de350b}NO{color}|
|_SHOW_ _TABLES_|YES|
|_DESCRIBE tableName_|YES|
|_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
|_ALTER_ _TABLE tableName
RENAME TO newTableName|YES|
|_ALTER_ _TABLE tableName
SET ( name=value [, name=value]*)|YES|

 

  was:
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The matrix is below:
|DDL|can be used in sql-client |
|_SHOW CATALOGS_|YES|
|_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
|_USE_ _CATALOG catalogName_ |YES|
|_CREATE DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName_|YES|
|_DROP IF EXISTS DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName RESTRICT_|YES|
|_DROP DATABASE dataBaseName CASCADE_|YES|
|_ALTER DATABASE dataBaseName SET
( name=value [, name=value]*)_|YES|
|_USE dataBaseName_ |YES|
|_SHOW_ _DATABASES_|YES|
|_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}|
|_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}|
|_SHOW_ _TABLES_|YES|
|_DESCRIBE tableName_|YES|
|_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
|_ALTER_ _TABLE tableName
RENAME TO newTableName_|YES|
|_ALTER_ _TABLE tableName
SET ( name=value [, name=value]*)_|YES|


> [Umbrella] Complete work and improve about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
>  about enhanced Flink SQL DDL.
> With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
> that this flip is not finished yet.
> The matrix is below:
> |DDL|can be used in sql-client |
> |_SHOW CATALOGS_|YES|
> |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
> |_USE_ _CATALOG catalogName_|YES|
> |_CREATE DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName_|YES|
> |_DROP IF EXISTS DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName RESTRICT_|YES|
> |_DROP DATABASE dataBaseName CASCADE_|YES|
> |_ALTER DATABASE dataBaseName SET
> ( name=value [, name=value]*)|YES|
> |_USE dataBaseName_|YES|
> |_SHOW_ _DATABASES_|YES|
> |_DESCRIBE DATABASE dataBasesName_|{color:#de350b}NO{color}|
> |_DESCRIBE EXTENDED DATABASE dataBasesName_|{color:#de350b}NO{color}|
> |_SHOW_ _TABLES_|YES|
> |_DESCRIBE tableName_|YES|
> |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
> |_ALTER_ _TABLE tableName
> RENAME TO newTableName|YES|
> |_ALTER_ _TABLE tableName
> SET ( name=value [, name=value]*)|YES|
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34372) Complete work for syntax `DESCRIBE CATALOG catalogName`

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34372:
--

 Summary: Complete work for syntax `DESCRIBE CATALOG catalogName`
 Key: FLINK-34372
 URL: https://issues.apache.org/jira/browse/FLINK-34372
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.10.0, 1.19.0
Reporter: xuyang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34370) [Umbrella] Complete work and improve about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34370:
---
Summary: [Umbrella] Complete work and improve about enhanced Flink SQL DDL  
 (was: [Umbrella] Complete work about enhanced Flink SQL DDL )

> [Umbrella] Complete work and improve about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
>  about enhanced Flink SQL DDL.
> With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
> that this flip is not finished yet.
> The matrix is below:
> |DDL|can be used in sql-client |
> |_SHOW CATALOGS_|YES|
> |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
> |_USE_ _CATALOG catalogName_ |YES|
> |_CREATE DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName_|YES|
> |_DROP IF EXISTS DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName RESTRICT_|YES|
> |_DROP DATABASE dataBaseName CASCADE_|YES|
> |_ALTER DATABASE dataBaseName SET
> ( name=value [, name=value]*)_|YES|
> |_USE dataBaseName_ |YES|
> |_SHOW_ _DATABASES_|YES|
> |_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}|
> |_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}|
> |_SHOW_ _TABLES_|YES|
> |_DESCRIBE tableName_|YES|
> |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
> |_ALTER_ _TABLE tableName
> RENAME TO newTableName_|YES|
> |_ALTER_ _TABLE tableName
> SET ( name=value [, name=value]*)_|YES|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34370:
---
Description: 
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The matrix is below:
|DDL|can be used in sql-client |
|_SHOW CATALOGS_|YES|
|_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
|_USE_ _CATALOG catalogName_ |YES|
|_CREATE DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName_|YES|
|_DROP IF EXISTS DATABASE dataBaseName_|YES|
|_DROP DATABASE dataBaseName RESTRICT_|YES|
|_DROP DATABASE dataBaseName CASCADE_|YES|
|_ALTER DATABASE dataBaseName SET
( name=value [, name=value]*)_|YES|
|_USE dataBaseName_ |YES|
|_SHOW_ _DATABASES_|YES|
|_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}|
|_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}|
|_SHOW_ _TABLES_|YES|
|_DESCRIBE tableName_|YES|
|_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
|_ALTER_ _TABLE tableName
RENAME TO newTableName_|YES|
|_ALTER_ _TABLE tableName
SET ( name=value [, name=value]*)_|YES|

  was:
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The martix is 


> [Umbrella] Complete work about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
>  about enhanced Flink SQL DDL.
> With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
> that this flip is not finished yet.
> The matrix is below:
> |DDL|can be used in sql-client |
> |_SHOW CATALOGS_|YES|
> |_DESCRIBE_ _CATALOG catalogName_|{color:#de350b}NO{color}|
> |_USE_ _CATALOG catalogName_ |YES|
> |_CREATE DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName_|YES|
> |_DROP IF EXISTS DATABASE dataBaseName_|YES|
> |_DROP DATABASE dataBaseName RESTRICT_|YES|
> |_DROP DATABASE dataBaseName CASCADE_|YES|
> |_ALTER DATABASE dataBaseName SET
> ( name=value [, name=value]*)_|YES|
> |_USE dataBaseName_ |YES|
> |_SHOW_ _DATABASES_|YES|
> |_DESCRIBE DATABASE dataBasesName_ |{color:#de350b}NO{color}|
> |_DESCRIBE EXTENDED DATABASE dataBasesName_ |{color:#de350b}NO{color}|
> |_SHOW_ _TABLES_|YES|
> |_DESCRIBE tableName_|YES|
> |_DESCRIBE EXTENDED tableName_|{color:#de350b}NO{color}|
> |_ALTER_ _TABLE tableName
> RENAME TO newTableName_|YES|
> |_ALTER_ _TABLE tableName
> SET ( name=value [, name=value]*)_|YES|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34370:
---
Description: 
This is a umbrella Jira for completing work for 
[Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
 about enhanced Flink SQL DDL.

With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.

The martix is 

  was:
This is a umbrella Jira for completing work for 
[Flip-69](https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement)
 about enhanced Flink SQL DDL.

With [FLINK-34254](https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.


> [Umbrella] Complete work about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement])
>  about enhanced Flink SQL DDL.
> With FLINK-34254(https://issues.apache.org/jira/browse/FLINK-34254), it seems 
> that this flip is not finished yet.
> The martix is 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34370:
---
Affects Version/s: 1.19.0

> [Umbrella] Complete work about enhanced Flink SQL DDL 
> --
>
> Key: FLINK-34370
> URL: https://issues.apache.org/jira/browse/FLINK-34370
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 1.10.0, 1.19.0
>Reporter: xuyang
>Priority: Major
>
> This is a umbrella Jira for completing work for 
> [Flip-69](https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement)
>  about enhanced Flink SQL DDL.
> With [FLINK-34254](https://issues.apache.org/jira/browse/FLINK-34254), it 
> seems that this flip is not finished yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-02-05 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang closed FLINK-34254.
--
Resolution: Duplicate

> `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
> -
>
> Key: FLINK-34254
> URL: https://issues.apache.org/jira/browse/FLINK-34254
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
>
> Add the test following to CalcITCase to re-produce this bug.
> {code:java}
> @Test
>   def test(): Unit = {
> tEnv.executeSql(s"""
>|create catalog `c_new` with (
>|  'type' = 'generic_in_memory',
>|  'default-database' = 'my_d'
>|)
>|""".stripMargin)
> tEnv
>   .executeSql(s"""
>  |show catalogs
>  |""".stripMargin)
>   .print
> tEnv
>   .executeSql(s"""
>  | describe catalog default_catalog
>  |""".stripMargin)
>   .print
>   } {code}
> Result:
> {code:java}
> +-+
> |    catalog name |
> +-+
> |           c_new |
> | default_catalog |
> +-+
> 2 rows in set
>  org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in 
> any table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>  at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
>at 
> org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453)
>   at java.lang.reflect.Method.invoke(Method.java:498) at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)  at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)  at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)  
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)  at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
>  by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 
> 19 to line 2, column 33: Column 'default_catalog' not found in any table  
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150)
>   at 
> org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304)
> at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474)  
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005)
> at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
> ... 11 moreCaused by: 
> org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 'default_catalog' not found in any tableat 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> 

[jira] [Comment Edited] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814564#comment-17814564
 ] 

xuyang edited comment on FLINK-34254 at 2/6/24 1:43 AM:


I have created a Jira (https://issues.apache.org/jira/browse/FLINK-34370) 
linked this one to do the improvement. So close it.


was (Author: xuyangzhong):
I have created a Jira linked this one to do the improvement.

> `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
> -
>
> Key: FLINK-34254
> URL: https://issues.apache.org/jira/browse/FLINK-34254
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
>
> Add the test following to CalcITCase to re-produce this bug.
> {code:java}
> @Test
>   def test(): Unit = {
> tEnv.executeSql(s"""
>|create catalog `c_new` with (
>|  'type' = 'generic_in_memory',
>|  'default-database' = 'my_d'
>|)
>|""".stripMargin)
> tEnv
>   .executeSql(s"""
>  |show catalogs
>  |""".stripMargin)
>   .print
> tEnv
>   .executeSql(s"""
>  | describe catalog default_catalog
>  |""".stripMargin)
>   .print
>   } {code}
> Result:
> {code:java}
> +-+
> |    catalog name |
> +-+
> |           c_new |
> | default_catalog |
> +-+
> 2 rows in set
>  org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in 
> any table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>  at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
>at 
> org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453)
>   at java.lang.reflect.Method.invoke(Method.java:498) at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)  at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)  at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)  
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)  at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
>  by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 
> 19 to line 2, column 33: Column 'default_catalog' not found in any table  
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150)
>   at 
> org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304)
> at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474)  
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005)
> at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
> ... 11 moreCaused by: 
> org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 

[jira] [Commented] (FLINK-34254) `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions

2024-02-05 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814564#comment-17814564
 ] 

xuyang commented on FLINK-34254:


I have created a Jira linked this one to do the improvement.

> `DESCRIBE` syntaxes like `DESCRIBE CATALOG xxx` throws strange exceptions
> -
>
> Key: FLINK-34254
> URL: https://issues.apache.org/jira/browse/FLINK-34254
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
>
> Add the test following to CalcITCase to re-produce this bug.
> {code:java}
> @Test
>   def test(): Unit = {
> tEnv.executeSql(s"""
>|create catalog `c_new` with (
>|  'type' = 'generic_in_memory',
>|  'default-database' = 'my_d'
>|)
>|""".stripMargin)
> tEnv
>   .executeSql(s"""
>  |show catalogs
>  |""".stripMargin)
>   .print
> tEnv
>   .executeSql(s"""
>  | describe catalog default_catalog
>  |""".stripMargin)
>   .print
>   } {code}
> Result:
> {code:java}
> +-+
> |    catalog name |
> +-+
> |           c_new |
> | default_catalog |
> +-+
> 2 rows in set
>  org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 2, column 19 to line 2, column 33: Column 'default_catalog' not found in 
> any table
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
> at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>  at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:259)
> at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
>at 
> org.apache.flink.table.planner.runtime.stream.sql.CalcITCase.test(CalcITCase.scala:453)
>   at java.lang.reflect.Method.invoke(Method.java:498) at 
> java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)  at 
> java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)  at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)  
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)  at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)Caused
>  by: org.apache.calcite.runtime.CalciteContextException: From line 2, column 
> 19 to line 2, column 33: Column 'default_catalog' not found in any table  
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)  
> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932) at 
> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917) at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5276)
>   at 
> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:273)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateIdentifier(SqlValidatorImpl.java:3150)
>   at 
> org.apache.calcite.sql.SqlIdentifier.validateExpr(SqlIdentifier.java:304)
> at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:474)  
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:6005)
> at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:138)at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1009)
> at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:758)
>  at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
> ... 11 moreCaused by: 
> org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 'default_catalog' not found in any tableat 
> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> 

[jira] [Created] (FLINK-34370) [Umbrella] Complete work about enhanced Flink SQL DDL

2024-02-05 Thread xuyang (Jira)
xuyang created FLINK-34370:
--

 Summary: [Umbrella] Complete work about enhanced Flink SQL DDL 
 Key: FLINK-34370
 URL: https://issues.apache.org/jira/browse/FLINK-34370
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: xuyang


This is a umbrella Jira for completing work for 
[Flip-69](https://cwiki.apache.org/confluence/display/FLINK/FLIP-69%3A+Flink+SQL+DDL+Enhancement)
 about enhanced Flink SQL DDL.

With [FLINK-34254](https://issues.apache.org/jira/browse/FLINK-34254), it seems 
that this flip is not finished yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34353) A strange exception will be thrown if minibatch size is not set while using mini-batch join

2024-02-04 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814061#comment-17814061
 ] 

xuyang commented on FLINK-34353:


I'll try to fix it.

> A strange exception will be thrown if minibatch size is not set while using 
> mini-batch join
> ---
>
> Key: FLINK-34353
> URL: https://issues.apache.org/jira/browse/FLINK-34353
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> java.lang.IllegalArgumentException: maxCount must be greater than 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger.(CountCoBundleTrigger.java:34)
> at 
> org.apache.flink.table.planner.plan.utils.MinibatchUtil.createMiniBatchCoTrigger(MinibatchUtil.java:61)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.java:231)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
>at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
>at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)   
> at scala.collection.Iterator.foreach(Iterator.scala:937)at 
> scala.collection.Iterator.foreach$(Iterator.scala:937)   at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425)   at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69)at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233)  at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:537)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
>   at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
> at org.apache.flink.table.api.Explainable.explain(Explainable.java:40) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34353) A strange exception will be thrown if minibatch size is not set while using mini-batch join

2024-02-04 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34353:
---
Description: 
{code:java}
java.lang.IllegalArgumentException: maxCount must be greater than 0
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)at 
org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger.(CountCoBundleTrigger.java:34)
at 
org.apache.flink.table.planner.plan.utils.MinibatchUtil.createMiniBatchCoTrigger(MinibatchUtil.java:61)
  at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.java:231)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
   at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
   at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)   at 
scala.collection.Iterator.foreach(Iterator.scala:937)at 
scala.collection.Iterator.foreach$(Iterator.scala:937)   at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)   at 
scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:69)at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike.map(TraversableLike.scala:233)  at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)  at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
  at 
org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:537)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
 at 
org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
  at 
org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)at 
org.apache.flink.table.api.Explainable.explain(Explainable.java:40) {code}

> A strange exception will be thrown if minibatch size is not set while using 
> mini-batch join
> ---
>
> Key: FLINK-34353
> URL: https://issues.apache.org/jira/browse/FLINK-34353
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> {code:java}
> java.lang.IllegalArgumentException: maxCount must be greater than 0
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> at 
> org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger.(CountCoBundleTrigger.java:34)
> at 
> org.apache.flink.table.planner.plan.utils.MinibatchUtil.createMiniBatchCoTrigger(MinibatchUtil.java:61)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin.translateToPlanInternal(StreamExecJoin.java:231)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
>at 
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
>at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)   
> at scala.collection.Iterator.foreach(Iterator.scala:937)at 
> scala.collection.Iterator.foreach$(Iterator.scala:937)   at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1425)   at 
> scala.collection.IterableLike.foreach(IterableLike.scala:70) at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:69)at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:233)  at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:226) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:537)
> at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103)
>  at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:51)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:697)
>   at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:482)
> 

[jira] [Created] (FLINK-34353) A strange exception will be thrown if minibatch size is not set while using mini-batch join

2024-02-04 Thread xuyang (Jira)
xuyang created FLINK-34353:
--

 Summary: A strange exception will be thrown if minibatch size is 
not set while using mini-batch join
 Key: FLINK-34353
 URL: https://issues.apache.org/jira/browse/FLINK-34353
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
Reporter: xuyang
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34349) Release Testing: Verify FLINK-34219 Introduce a new join operator to support minibatch

2024-02-03 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814047#comment-17814047
 ] 

xuyang commented on FLINK-34349:


Hi, I'd like to take this verification.

> Release Testing: Verify FLINK-34219 Introduce a new join operator to support 
> minibatch
> --
>
> Key: FLINK-34349
> URL: https://issues.apache.org/jira/browse/FLINK-34349
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Affects Versions: 1.19.0
>Reporter: Shuai Xu
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Minibatch join is ready. Users could improve performance in regular stream 
> join scenarios. 
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24240)] although it is still 
> being reviewed.
> If someone finds some bugs about this feature, you open a Jira linked this 
> one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34348) Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input

2024-02-03 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34348:
---
Description: 
Window TVF aggregation supports changelog stream  is ready for testing. User 
can add a window tvf aggregation as a down stream after CDC source or some 
nodes that will produce cdc records.

Someone can verify this feature with:
 # Prepare a mysql table, and insert some data at first.
 # Start sql-client and prepare ddl for this mysql table as a cdc source.
 # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a 
window aggregate node and the changelog contains "UA" or "UB" or "D" in its 
upstream. 
 # Use different kinds of window tvf to test window tvf aggregation while 
updating the source data to check the data correctness.

> Release Testing: Verify FLINK-20281 Window aggregation supports changelog 
> stream input
> --
>
> Key: FLINK-34348
> URL: https://issues.apache.org/jira/browse/FLINK-34348
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Window TVF aggregation supports changelog stream  is ready for testing. User 
> can add a window tvf aggregation as a down stream after CDC source or some 
> nodes that will produce cdc records.
> Someone can verify this feature with:
>  # Prepare a mysql table, and insert some data at first.
>  # Start sql-client and prepare ddl for this mysql table as a cdc source.
>  # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a 
> window aggregate node and the changelog contains "UA" or "UB" or "D" in its 
> upstream. 
>  # Use different kinds of window tvf to test window tvf aggregation while 
> updating the source data to check the data correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF

2024-02-03 Thread xuyang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuyang updated FLINK-34346:
---
Description: 
Session window TVF is ready. Users can use Session window TVF aggregation 
instead of using legacy session group window aggregation.

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24250]) although it is still being 
reviewed. 

Further more,  although session window join, session window rank and session 
window deduplicate are in experimental state, If someone finds some bugs about 
them, you could also open a Jira linked this one to report them.

> Release Testing: Verify FLINK-24024 Support session Window TVF
> --
>
> Key: FLINK-34346
> URL: https://issues.apache.org/jira/browse/FLINK-34346
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>
> Session window TVF is ready. Users can use Session window TVF aggregation 
> instead of using legacy session group window aggregation.
> Someone can verify this feature by following the 
> [doc]([https://github.com/apache/flink/pull/24250]) although it is still 
> being reviewed. 
> Further more,  although session window join, session window rank and session 
> window deduplicate are in experimental state, If someone finds some bugs 
> about them, you could also open a Jira linked this one to report them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34348) Release Testing: Verify FLINK-20281 Window aggregation supports changelog stream input

2024-02-03 Thread xuyang (Jira)
xuyang created FLINK-34348:
--

 Summary: Release Testing: Verify FLINK-20281 Window aggregation 
supports changelog stream input
 Key: FLINK-34348
 URL: https://issues.apache.org/jira/browse/FLINK-34348
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.19.0
Reporter: xuyang
Assignee: xuyang
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34346) Release Testing: Verify FLINK-24024 Support session Window TVF

2024-02-03 Thread xuyang (Jira)
xuyang created FLINK-34346:
--

 Summary: Release Testing: Verify FLINK-24024 Support session 
Window TVF
 Key: FLINK-34346
 URL: https://issues.apache.org/jira/browse/FLINK-34346
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.19.0
Reporter: xuyang
Assignee: xuyang
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-33963) There is only one UDF instance after serializing the same task

2024-02-03 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814014#comment-17814014
 ] 

xuyang edited comment on FLINK-33963 at 2/4/24 2:34 AM:


I agree that we should be more rigorous in our consideration of whether a 
scalar function should be reused.

However, in the scenario you mentioned, if the path can be extracted once and 
used permanently, it then implies that the path is a constant across all 
elements. In such a case, using two scalar functions might not be the best 
choice. That's why I just say this is not a uncommon scenario.


was (Author: xuyangzhong):
I agree that we should be more rigorous in our consideration of whether a 
scalar function should be reused.

However, in the scenario you mentioned, if the path can be extracted once and 
used permanently, it then implies that the path is a constant across all 
elements. In such a case, using a scalar function might not be the best choice. 
That's why I just say this is not a uncommon scenario.

> There is only one UDF instance after serializing the same task
> --
>
> Key: FLINK-33963
> URL: https://issues.apache.org/jira/browse/FLINK-33963
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
> Environment: local env in idea test.
> java 8
>Reporter: lifengchao
>Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it 
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> {code}
> sql:
> {code:sql}
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional inferType(CallContext 
> callContext) {
> List argumentDataTypes = 
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
> throw callContext.newValidationError("arg size 
> error");
> }
> if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
> throw callContext.newValidationError("Literal 
> expected for second argument.");
> }
> cache = callContext.getArgumentValue(1, 
> String.class).get();
> return Optional.of(DataTypes.STRING());
> }
> })
> .build();
> }
> }
> {code}
>  
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
>  

[jira] [Commented] (FLINK-33963) There is only one UDF instance after serializing the same task

2024-02-03 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814014#comment-17814014
 ] 

xuyang commented on FLINK-33963:


I agree that we should be more rigorous in our consideration of whether a 
scalar function should be reused.

However, in the scenario you mentioned, if the path can be extracted once and 
used permanently, it then implies that the path is a constant across all 
elements. In such a case, using a scalar function might not be the best choice. 
That's why I just say this is not a uncommon scenario.

> There is only one UDF instance after serializing the same task
> --
>
> Key: FLINK-33963
> URL: https://issues.apache.org/jira/browse/FLINK-33963
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0
> Environment: local env in idea test.
> java 8
>Reporter: lifengchao
>Priority: Major
>
> I define this UDF and expect the following SQL to return 'a', 'b', but it 
> return 'a', 'a'.
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> {code}
> sql:
> {code:sql}
> select
> udf_ser(name, 'a') name1,
> udf_ser(name, 'b') name2
> from heros
> {code}
> Changing UDF to this will achieve the expected results.
> {code:java}
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional inferType(CallContext 
> callContext) {
> List argumentDataTypes = 
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
> throw callContext.newValidationError("arg size 
> error");
> }
> if (!callContext.isArgumentLiteral(1) || 
> callContext.isArgumentNull(1)) {
> throw callContext.newValidationError("Literal 
> expected for second argument.");
> }
> cache = callContext.getArgumentValue(1, 
> String.class).get();
> return Optional.of(DataTypes.STRING());
> }
> })
> .build();
> }
> }
> {code}
>  
> My complete test code:
> {code:java}
> public class UdfSerializeFunc extends ScalarFunction {
> static final Logger LOG = LoggerFactory.getLogger(UdfSerializeFunc.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> }
> public class UdfSerializeFunc2 extends ScalarFunction {
> static final Logger LOG = 
> LoggerFactory.getLogger(UdfSerializeFunc2.class);
> String cache;
> @Override
> public void open(FunctionContext context) throws Exception {
> LOG.warn("open:{}.", this.hashCode());
> }
> public String eval(String a, String b){
> if(cache == null){
> LOG.warn("cache_null.cache:{}", b);
> cache = b;
> }
> return cache;
> }
> @Override
> public TypeInference getTypeInference(DataTypeFactory typeFactory) {
> return TypeInference.newBuilder()
> .outputTypeStrategy(new TypeStrategy() {
> @Override
> public Optional inferType(CallContext 
> callContext) {
> List argumentDataTypes = 
> callContext.getArgumentDataTypes();
> if (argumentDataTypes.size() != 2) {
>   

[jira] [Commented] (FLINK-34301) Release Testing Instructions: Verify FLINK-20281 Window aggregation supports changelog stream input

2024-02-02 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813595#comment-17813595
 ] 

xuyang commented on FLINK-34301:


Window TVF aggregation supports changelog stream  is ready for testing. User 
can add a window tvf aggregation as a down stream after CDC source or some 
nodes that will produce cdc records.

Someone can verify this feature with:
 # Prepare a mysql table, and insert some data at first.
 # Start sql-client and prepare ddl for this mysql table as a cdc source.
 # You can verify the plan by `EXPLAIN PLAN_ADVICE` to check if there is a 
window aggregate node and the changelog contains "UA" or "UB" or "D" in its 
upstream. 
 # Use different kinds of window tvf to test window tvf aggregation while 
updating the source data to check the data correctness.

> Release Testing Instructions: Verify FLINK-20281 Window aggregation supports 
> changelog stream input
> ---
>
> Key: FLINK-34301
> URL: https://issues.apache.org/jira/browse/FLINK-34301
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF

2024-02-02 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813590#comment-17813590
 ] 

xuyang edited comment on FLINK-34300 at 2/2/24 9:29 AM:


Session window TVF is ready. Users can use Session window TVF aggregation 
instead of using legacy session group window aggregation.

Someone can verify this feature by following the 
[doc]([https://github.com/apache/flink/pull/24250]) although it is still being 
reviewed. 

Further more,  although session window join, session window rank and session 
window deduplicate are in experimental state, If someone finds some bugs about 
them, you could also open a Jira linked this one to report them.


was (Author: xuyangzhong):
Session window TVF is ready. Users can use Session window TVF aggregation 
instead of using legacy session group window aggregation.

Someone can verify this feature by the 
[doc](https://github.com/apache/flink/pull/24250) although it is preparing. 

Further more,  although session window join, session window rank and session 
window deduplicate are in experimental state, If someone finds some bugs about 
them, you could also open a Jira linked this one to report it.

> Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
> ---
>
> Key: FLINK-34300
> URL: https://issues.apache.org/jira/browse/FLINK-34300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34300) Release Testing Instructions: Verify FLINK-24024 Support session Window TVF

2024-02-02 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813590#comment-17813590
 ] 

xuyang commented on FLINK-34300:


Session window TVF is ready. Users can use Session window TVF aggregation 
instead of using legacy session group window aggregation.

Someone can verify this feature by the 
[doc](https://github.com/apache/flink/pull/24250) although it is preparing. 

Further more,  although session window join, session window rank and session 
window deduplicate are in experimental state, If someone finds some bugs about 
them, you could also open a Jira linked this one to report it.

> Release Testing Instructions: Verify FLINK-24024 Support session Window TVF
> ---
>
> Key: FLINK-34300
> URL: https://issues.apache.org/jira/browse/FLINK-34300
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.19.0
>Reporter: lincoln lee
>Assignee: xuyang
>Priority: Blocker
>  Labels: release-testing
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34338) An exception is thrown when some named params change order when using window tvf

2024-02-01 Thread xuyang (Jira)
xuyang created FLINK-34338:
--

 Summary: An exception is thrown when some named params change 
order when using window tvf
 Key: FLINK-34338
 URL: https://issues.apache.org/jira/browse/FLINK-34338
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.15.0
Reporter: xuyang
 Fix For: 1.19.0


This bug can be reproduced by the following sql in `WindowTableFunctionTest`

 
{code:java}
@Test
def test(): Unit = {
  val sql =
"""
  |SELECT *
  |FROM TABLE(TUMBLE(
  |   DATA => TABLE MyTable,
  |   SIZE => INTERVAL '15' MINUTE,
  |   TIMECOL => DESCRIPTOR(rowtime)
  |   ))
  |""".stripMargin
  util.verifyRelPlan(sql)
}{code}
In Flip-145 and user doc, we can found `the DATA param must be the first`, but 
it seems that we also can't change the order about other params.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34335) Query hints in RexSubQuery could not be printed

2024-02-01 Thread xuyang (Jira)
xuyang created FLINK-34335:
--

 Summary: Query hints in RexSubQuery could not be printed
 Key: FLINK-34335
 URL: https://issues.apache.org/jira/browse/FLINK-34335
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: xuyang
 Fix For: 1.19.0


That is because in `RelTreeWriterImpl`, we don't care about the `RexSubQuery`. 
And `RexSubQuery` use `RelOptUtil.toString(rel)` to print itself instead of 
adding extra information such as query hints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34335) Query hints in RexSubQuery could not be printed

2024-02-01 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813470#comment-17813470
 ] 

xuyang commented on FLINK-34335:


I'll try to fix it.

> Query hints in RexSubQuery could not be printed
> ---
>
> Key: FLINK-34335
> URL: https://issues.apache.org/jira/browse/FLINK-34335
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>
> That is because in `RelTreeWriterImpl`, we don't care about the 
> `RexSubQuery`. And `RexSubQuery` use `RelOptUtil.toString(rel)` to print 
> itself instead of adding extra information such as query hints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34272) AdaptiveSchedulerClusterITCase failure due to MiniCluster not running

2024-01-31 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17813001#comment-17813001
 ] 

xuyang commented on FLINK-34272:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57161=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8

> AdaptiveSchedulerClusterITCase failure due to MiniCluster not running
> -
>
> Key: FLINK-34272
> URL: https://issues.apache.org/jira/browse/FLINK-34272
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.19.0
>Reporter: Matthias Pohl
>Assignee: David Morávek
>Priority: Blocker
>  Labels: pull-request-available, test-stability
>
> [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=57073=logs=0da23115-68bb-5dcd-192c-bd4c8adebde1=24c3384f-1bcb-57b3-224f-51bf973bbee8=9543]
> {code:java}
>  Jan 29 17:21:29 17:21:29.465 [ERROR] Tests run: 3, Failures: 0, Errors: 2, 
> Skipped: 0, Time elapsed: 12.48 s <<< FAILURE! -- in 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase
> Jan 29 17:21:29 17:21:29.465 [ERROR] 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testAutomaticScaleUp
>  -- Time elapsed: 8.599 s <<< ERROR!
> Jan 29 17:21:29 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 29 17:21:29   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1118)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:991)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getArchivedExecutionGraph(MiniCluster.java:840)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.lambda$waitUntilParallelismForVertexReached$3(AdaptiveSchedulerClusterITCase.java:270)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.waitUntilParallelismForVertexReached(AdaptiveSchedulerClusterITCase.java:265)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testAutomaticScaleUp(AdaptiveSchedulerClusterITCase.java:146)
> Jan 29 17:21:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 29 17:21:29   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
> Jan 29 17:21:29 
> Jan 29 17:21:29 17:21:29.466 [ERROR] 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale
>  -- Time elapsed: 2.036 s <<< ERROR!
> Jan 29 17:21:29 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 29 17:21:29   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1118)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:991)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getExecutionGraph(MiniCluster.java:969)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.lambda$testCheckpointStatsPersistedAcrossRescale$1(AdaptiveSchedulerClusterITCase.java:183)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:151)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition(CommonTestUtils.java:145)
> Jan 29 17:21:29   at 
> org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerClusterITCase.testCheckpointStatsPersistedAcrossRescale(AdaptiveSchedulerClusterITCase.java:180)
> Jan 29 17:21:29   at java.lang.reflect.Method.invoke(Method.java:498)
> Jan 29 17:21:29   at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34323) Session window tvf failed when using named params

2024-01-31 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812698#comment-17812698
 ] 

xuyang commented on FLINK-34323:


I'll try to fix it.

> Session window tvf failed when using named params
> -
>
> Key: FLINK-34323
> URL: https://issues.apache.org/jira/browse/FLINK-34323
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API, Table SQL / Planner
>Affects Versions: 1.19.0
>Reporter: xuyang
>Priority: Major
> Fix For: 1.19.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34323) Session window tvf failed when using named params

2024-01-31 Thread xuyang (Jira)
xuyang created FLINK-34323:
--

 Summary: Session window tvf failed when using named params
 Key: FLINK-34323
 URL: https://issues.apache.org/jira/browse/FLINK-34323
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.19.0
Reporter: xuyang
 Fix For: 1.19.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34271) Fix the potential failure test about GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT

2024-01-31 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812664#comment-17812664
 ] 

xuyang commented on FLINK-34271:


[~dwysakowicz] I understand and agree with you. That's why I just work around 
my test.

> Fix the potential failure test about 
> GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT
> --
>
> Key: FLINK-34271
> URL: https://issues.apache.org/jira/browse/FLINK-34271
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> The underlying reason is that a previous PR introduced a test with state TTL 
> as follows in the SQL: 
> {code:java}
> .runSql(
> "INSERT INTO sink_t SELECT /*+ STATE_TTL('source_t' = '4d') */"
> + "b, "
> + "COUNT(*) AS cnt, "
> + "AVG(a) FILTER (WHERE a > 1) AS avg_a, "
> + "MIN(c) AS min_c "
> + "FROM source_t GROUP BY b"){code}
> When the savepoint metadata was generated for the first time, the metadata 
> recorded the time when a certain key was accessed. If the test is rerun after 
> the TTL has expired, the state of this key in the metadata will be cleared, 
> resulting in an incorrect test outcome.
> To rectify this issue, I think the current tests in RestoreTestBase could be 
> modified to regenerate a new savepoint metadata as needed every time. 
> However, this seems to deviate from the original design purpose of 
> RestoreTestBase.
> For my test, I will work around this by removing the data 
> "consumedBeforeRestore", as I am only interested in testing the generation of 
> an expected JSON plan.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   >