[jira] [Commented] (FLINK-33748) Remove legacy TableSource/TableSink API in 2.0

2024-10-08 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17887507#comment-17887507
 ] 

lincoln lee commented on FLINK-33748:
-

Split into two-steps:

1st, package moving was done by FLINK-36369, so this jira is not a blocker of 
2.0-preview

2nd, do the actual internal code removal

> Remove legacy TableSource/TableSink API in 2.0
> --
>
> Key: FLINK-33748
> URL: https://issues.apache.org/jira/browse/FLINK-33748
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Weijie Guo
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0-preview
>
>
> {{TableSource}} and {{TableSink}} already marked as deprecated in 
> FLINK-19453, and can be removed in 2.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-33748) Remove legacy TableSource/TableSink API in 2.0

2024-10-08 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-33748:

Fix Version/s: (was: 2.0-preview)

> Remove legacy TableSource/TableSink API in 2.0
> --
>
> Key: FLINK-33748
> URL: https://issues.apache.org/jira/browse/FLINK-33748
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Weijie Guo
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
>
> {{TableSource}} and {{TableSink}} already marked as deprecated in 
> FLINK-19453, and can be removed in 2.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36369) Move deprecated user-visible classes in table modules to the legacy package to make it easier to delete them later

2024-09-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36369.
---
  Assignee: xuyang
Resolution: Fixed

fixed in master: 10037e39b5cbcdbf292d769bcaceb912abf6782c

> Move deprecated user-visible classes in table modules to the legacy package 
> to make it easier to delete them later
> --
>
> Key: FLINK-36369
> URL: https://issues.apache.org/jira/browse/FLINK-36369
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

2024-09-25 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-34702.
---
Fix Version/s: 2.0-preview
   (was: 2.0.0)
   Resolution: Fixed

fixed in master: 2f8b2d81a97a681d67964ef17932ea0abf35730f

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> 
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT *,
>   |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>   |  FROM (select a, count(b) as b from MyTable group by a)
>   |)
>   |WHERE rowNum = 1
> """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36273) Remove deprecated Table/SQL configuration in 2.0

2024-09-25 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36273.
---
  Assignee: xuyang
Resolution: Fixed

Fixed in master: 733a56682bcceabab6625b621547538c6d993976

> Remove deprecated Table/SQL configuration in 2.0
> 
>
> Key: FLINK-36273
> URL: https://issues.apache.org/jira/browse/FLINK-36273
> Project: Flink
>  Issue Type: Sub-task
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

2024-09-24 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-34702:
---

Assignee: lincoln lee

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> 
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT *,
>   |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>   |  FROM (select a, count(b) as b from MyTable group by a)
>   |)
>   |WHERE rowNum = 1
> """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36358) to_timestamp result is not correct when the string precision is long than date format

2024-09-24 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36358.
---

Fixed in master: 74b9a480317b3b2d93bf6953ba8934dedab1375f

> to_timestamp result is not correct when the string precision is long than 
> date format
> -
>
> Key: FLINK-36358
> URL: https://issues.apache.org/jira/browse/FLINK-36358
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Jacky Lau
>Assignee: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> tEnv.executeSql("select to_timestamp('2017-09-15 00:00:00.12345', '-MM-dd 
> HH:mm:ss.SSS')").print()  
> 2017-09-15 00:00:00.123  correct
>  
> tEnv.executeSql("select cast(to_timestamp('2017-09-15 00:00:00.12345', 
> '-MM-dd HH:mm:ss.SSS') as string)").print() 
> 2017-09-15 00:00:00.12345 not  correct



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36358) to_timestamp result is not correct when the string precision is long than date format

2024-09-24 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36358:
---

Assignee: Jacky Lau

> to_timestamp result is not correct when the string precision is long than 
> date format
> -
>
> Key: FLINK-36358
> URL: https://issues.apache.org/jira/browse/FLINK-36358
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0, 1.18.0, 1.19.0
>Reporter: Jacky Lau
>Assignee: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> tEnv.executeSql("select to_timestamp('2017-09-15 00:00:00.12345', '-MM-dd 
> HH:mm:ss.SSS')").print()  
> 2017-09-15 00:00:00.123  correct
>  
> tEnv.executeSql("select cast(to_timestamp('2017-09-15 00:00:00.12345', 
> '-MM-dd HH:mm:ss.SSS') as string)").print() 
> 2017-09-15 00:00:00.12345 not  correct



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36327) Remove the dependencies of the flink-scala and flink-streaming-scala modules from the table module.

2024-09-24 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36327?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36327.
---
  Assignee: xuyang
Resolution: Fixed

fixed in master: 7a06c3ba00ef0840ad59707cece2b9d3fcbd2409

> Remove the dependencies of the flink-scala and flink-streaming-scala modules 
> from the table module.
> ---
>
> Key: FLINK-36327
> URL: https://issues.apache.org/jira/browse/FLINK-36327
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36248) Introduce new Join Operator with Async State API

2024-09-24 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36248.
---
Resolution: Fixed

Fixed in master: 5a63c56ca0b38ebae9fc475708917c35a4168844

> Introduce new Join Operator with Async State API
> 
>
> Key: FLINK-36248
> URL: https://issues.apache.org/jira/browse/FLINK-36248
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29739) [FLIP-265] Deprecate and remove Scala API support

2024-09-24 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884193#comment-17884193
 ] 

lincoln lee commented on FLINK-29739:
-

Thanks [~xuyangzhong] for your inputs and hard work on the related 
cleanup/migration tasks, such a large number of changes!

In addition to the user impact of removing the scala table api module, there 
are a large number of test dependencies within table planner, and the 
associated modification effort is huge. And the ongoing removal of flink-scala 
can't be blocked (since both scala table api and table planner have 
dependencies on it).


Also, time is running out for the 2.0 preview release, so we've taken the 
intermediate step of keeping the scala table api module self-contained and no 
longer dependent on flink-scala in order to avoid a lot of immediate changes in 
table-planner (refer to #25364).

Regardless of whether we decide to remove the scala table api or not, overall 
this is a step forward.

cc [~martijnvisser] [~twalthr] [~jark] 

> [FLIP-265] Deprecate and remove Scala API support
> -
>
> Key: FLINK-29739
> URL: https://issues.apache.org/jira/browse/FLINK-29739
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Scala
>Reporter: Martijn Visser
>Priority: Major
>  Labels: 2.0-related
>
> FLIP: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36354) Enable FORSTDB_BACKEND in async releated tests

2024-09-23 Thread lincoln lee (Jira)
lincoln lee created FLINK-36354:
---

 Summary: Enable FORSTDB_BACKEND in async releated tests
 Key: FLINK-36354
 URL: https://issues.apache.org/jira/browse/FLINK-36354
 Project: Flink
  Issue Type: Sub-task
Reporter: lincoln lee
Assignee: xuyang
 Fix For: 2.0.0


[We should enable forstdb statebackend in StreamingWithStateTestBase in 2.0 
release|https://github.com/apache/flink/pull/25320/files/6874b0dd02c2e93f2e8f86959fc10f1ea80a5bda#diff-681275426abcccb91c7063c933ed32b104da24c6102a87b3196b56553856bf09]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

2024-09-23 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884085#comment-17884085
 ] 

lincoln lee commented on FLINK-34702:
-

[~AlexeyLV] Don't worry about losing any performance optimizations, because 
we're just removing a physical deduplication node that doesn't seem to be 
necessary right now, and we're still retaining the deduplication execution node 
'StreamExecDeduplicate'.

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> 
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT *,
>   |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>   |  FROM (select a, count(b) as b from MyTable group by a)
>   |)
>   |WHERE rowNum = 1
> """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

2024-09-23 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884084#comment-17884084
 ] 

lincoln lee commented on FLINK-34702:
-

A draft pr was created: #25380. If we agree this solution, I'll finish the work 
(including some cleanup work for the useless node `StreamPhysicalDeduplicate`).

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> 
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT *,
>   |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>   |  FROM (select a, count(b) as b from MyTable group by a)
>   |)
>   |WHERE rowNum = 1
> """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34702) Rank should not convert to StreamExecDuplicate when the input is not insert only

2024-09-23 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17883911#comment-17883911
 ] 

lincoln lee commented on FLINK-34702:
-

Thank you guys bringing this up again!

First of all, I think we can all agree that deduplicate is a special case of 
rank
2nd, the options we've discussed so far are not ideal(whether it's 
physicaldedup -> exec or logical -> physical dedup, to do something special to 
change the conversion)

Going back to the problem itself, something new come to my mind: maybe we 
should consider removing the physical dedup node, and instead just keep the 
physical rank node(because dedup is just one specialized execution/operator 
which satisfies several conditions, the input changelog mode, rank range, sort 
spec and other traits)

I'm preparing a draft pr based on this idea, maybe take some time(probably a 
day or two) but I think it can completely solve the problems.

> Rank should not convert to StreamExecDuplicate when the input is not insert 
> only
> 
>
> Key: FLINK-34702
> URL: https://issues.apache.org/jira/browse/FLINK-34702
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Jacky Lau
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> {code:java}
> @Test
> def testSimpleFirstRowOnBuiltinProctime1(): Unit = {
>   val sqlQuery =
> """
>   |SELECT *
>   |FROM (
>   |  SELECT *,
>   |ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as 
> rowNum
>   |  FROM (select a, count(b) as b from MyTable group by a)
>   |)
>   |WHERE rowNum = 1
> """.stripMargin
>   util.verifyExecPlan(sqlQuery)
> } {code}
> Exception:
> org.apache.flink.table.api.TableException: StreamPhysicalDeduplicate doesn't 
> support consuming update changes which is produced by node 
> GroupAggregate(groupBy=[a], select=[a, COUNT(b) AS b])
> because the StreamPhysicalDeduplicate can not consuming update changes now 
> while StreamExecRank can.
> so we should not convert the FlinkLogicalRank to StreamPhysicalDeduplicate in 
> this case. and we can defer whether input contains update change in the 
> "optimize the physical plan" phase. 
> so we can add an option to solve it. and when the StreamPhysicalDeduplicate 
> can support consuming update changes , we can deprecate it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36284) StreamTableEnvironment#toDataStream(Table table, Class targetClass) is not suitable for setting targetClass as a class generated by Avro.

2024-09-20 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36284.
---
  Assignee: xuyang
Resolution: Fixed

Fixed in master: 1cd9110f858a809ea640a23e872303be02015db9

> StreamTableEnvironment#toDataStream(Table table, Class targetClass) is not 
> suitable for setting targetClass as a class generated by Avro.
> 
>
> Key: FLINK-36284
> URL: https://issues.apache.org/jira/browse/FLINK-36284
> Project: Flink
>  Issue Type: Improvement
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / API
>Reporter: xuyang
>Assignee: xuyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
> Attachments: image-2024-09-14-17-39-16-698.png
>
>
> This issue can be fired by updating the {{testAvroToAvro}} method in the 
> {{org.apache.flink.table.runtime.batch.AvroTypesITCase}} class.
>  
> {code:java}
> @Test
> public void testAvroToAvro() {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStream ds = testData(env);
> // before: using deprecated method
> // Table t = tEnv.fromDataStream(ds, selectFields(ds));
> // after: using recommended new method
>     Table t = tEnv.fromDataStream(ds);
> Table result = t.select($("*"));
>     // before: using deprecated method
>     // List results =
> //CollectionUtil.iteratorToList(
> //DataStreamUtils.collect(tEnv.toAppendStream(result, 
> User.class)));
> // after: using recommended new method
>     List results =
> CollectionUtil.iteratorToList(
> DataStreamUtils.collect(tEnv.toDataStream(result, 
> User.class)));
> List expected = Arrays.asList(USER_1, USER_2, USER_3);
> assertThat(results).isEqualTo(expected);
> } {code}
> An exception will be thrown:
> !image-2024-09-14-17-39-16-698.png|width=1049,height=594!
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36283) The new PERCENTILE function doesn't follow user function first behavior when it encounters a user function with same name

2024-09-17 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36283.
---
Fix Version/s: 2.0-preview
   (was: 2.0.0)
   Resolution: Fixed

Fixed in master: cfd532d552008fb10d158707da9e5538d5938a05

> The new PERCENTILE function doesn't follow user function first behavior when 
> it encounters a user function with same name
> -
>
> Key: FLINK-36283
> URL: https://issues.apache.org/jira/browse/FLINK-36283
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0-preview
>Reporter: lincoln lee
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> The new PERCENTILE function doesn't follow user function first behavior when 
> it encounters a user function with same name, e.g.,  user create a temporary 
> function named
> `percentile`, the following query should use user's function instead of 
> builtin one:
> {code}
> select percentile(...)
> {code}
> This was mentioned during the review but lost the final check, we should fix 
> it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36283) The new PERCENTILE function doesn't follow user function first behavior when it encounters a user function with same name

2024-09-13 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-36283:

Issue Type: Bug  (was: Improvement)

> The new PERCENTILE function doesn't follow user function first behavior when 
> it encounters a user function with same name
> -
>
> Key: FLINK-36283
> URL: https://issues.apache.org/jira/browse/FLINK-36283
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0-preview
>Reporter: lincoln lee
>Assignee: Dylan He
>Priority: Major
> Fix For: 2.0.0
>
>
> The new PERCENTILE function doesn't follow user function first behavior when 
> it encounters a user function with same name, e.g.,  user create a temporary 
> function named
> `percentile`, the following query should use user's function instead of 
> builtin one:
> {code}
> select percentile(...)
> {code}
> This was mentioned during the review but lost the final check, we should fix 
> it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36283) The new PERCENTILE function doesn't follow user function first behavior when it encounters a user function with same name

2024-09-13 Thread lincoln lee (Jira)
lincoln lee created FLINK-36283:
---

 Summary: The new PERCENTILE function doesn't follow user function 
first behavior when it encounters a user function with same name
 Key: FLINK-36283
 URL: https://issues.apache.org/jira/browse/FLINK-36283
 Project: Flink
  Issue Type: Improvement
Affects Versions: 2.0-preview
Reporter: lincoln lee
Assignee: Dylan He
 Fix For: 2.0.0


The new PERCENTILE function doesn't follow user function first behavior when it 
encounters a user function with same name, e.g.,  user create a temporary 
function named

`percentile`, the following query should use user's function instead of builtin 
one:

{code}

select percentile(...)

{code}

This was mentioned during the review but lost the final check, we should fix it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29739) [FLIP-265] Deprecate and remove Scala API support

2024-09-13 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881486#comment-17881486
 ] 

lincoln lee commented on FLINK-29739:
-

We've been working on code cleanup for the deprecated api related to FLIP-265 
(including the scala table api), and it's expected to have it done by 9/30 in 
time for the planned 2.0-preview release.

After reading this early message, my first thought is that if we can keep the 
scala table api with lower maintenance cost, it will be helpful for scala api 
users.

However, my colleague encountered some problems when trying to do the related 
migration, which involves the code of flink scala api that will be deleted, one 
way to deal with it is to move all the dependent code to scala table api, but 
it may make this module become less lightweight and increase the maintenance 
cost afterwards. [~yyhx] [~xuyangzhong] can you help to add more details?

Since this is a message from almost 2 years ago and we haven't found any more 
follow up discussions, I'd like to ask you both what you think so far? 
[~twalthr] [~martijnvisser] 

> [FLIP-265] Deprecate and remove Scala API support
> -
>
> Key: FLINK-29739
> URL: https://issues.apache.org/jira/browse/FLINK-29739
> Project: Flink
>  Issue Type: Technical Debt
>  Components: API / Scala
>Reporter: Martijn Visser
>Priority: Major
>  Labels: 2.0-related
>
> FLIP: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36230) REGEXP_EXTRACT Python Table API fails

2024-09-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36230.
---
Fix Version/s: 2.0-preview
 Assignee: Dylan He
   Resolution: Fixed

fixed in master: 8c6f13cead9e982f568c36135a350c45baf90d6c

> REGEXP_EXTRACT Python Table API fails
> -
>
> Key: FLINK-36230
> URL: https://issues.apache.org/jira/browse/FLINK-36230
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
> Attachments: image-2024-09-06-10-37-12-147.png
>
>
> Invalid call when extract_index is None.
> !image-2024-09-06-10-37-12-147.png|width=514,height=168!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35827) Equality bewteen a row field and a row constant is wrong in SQL

2024-09-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35827.
---
Fix Version/s: 2.0-preview
 Assignee: xuyang
   Resolution: Fixed

fixed in master: 54b0d6a858cd18e57fde60966a01cd673cb6da7a

> Equality bewteen a row field and a row constant is wrong in SQL
> ---
>
> Key: FLINK-35827
> URL: https://issues.apache.org/jira/browse/FLINK-35827
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: yisha zhou
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> To reproduce the issue, you can add codes below in RowTypeTest
> {code:java}
> testAllApis(
>   'f2 === row(2, "foo", true),
>   "f2 = row(2, 'foo', true)",
>   "true"
> ) {code}
> f2 is actually the same as the constant `row(2, "foo", true)`, however the 
> result of expression `f2 = row(2, 'foo', true)`  is false.   
> The root cause is that `ScalarOperatorGens.generateEquals` generates code 
> like `$leftTerm.equals($rightTerm)` for row types. However f2 may be a 
> GenericRowData, the constant may be a BinaryRowData, the equality between 
> them are false.
>  
> And after investigating the code, I believe logic in 
> `EqualiserCodeGenerator.generateEqualsCode` can handle the issue here. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36142) Remove TestTableSourceWithTime and its subclasses to prepare to remove TableEnvironmentInternal#registerTableSourceInternal

2024-09-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36142.
---
Resolution: Fixed

fixed in master: 8b8e695188114c7087d3a9ed83fc9f2ee2776339

> Remove TestTableSourceWithTime and its subclasses to prepare to remove 
> TableEnvironmentInternal#registerTableSourceInternal
> ---
>
> Key: FLINK-36142
> URL: https://issues.apache.org/jira/browse/FLINK-36142
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner, Table SQL / Runtime
>Reporter: xuyang
>Assignee: xuyang
>Priority: Blocker
>  Labels: 2.0-related, pull-request-available
> Fix For: 2.0-preview
>
>
> This work will involve removing the existing UT and IT tests for 
> LegacyTableSource and LegacyTableSink.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36123) Add PERCENTILE function

2024-09-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36123.
---
  Assignee: Dylan He
Resolution: Fixed

fixed in master: c749a1b61c9c154f32cb877b96274804a950bfd0

> Add PERCENTILE function
> ---
>
> Key: FLINK-36123
> URL: https://issues.apache.org/jira/browse/FLINK-36123
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
> Attachments: image-2024-09-06-10-23-42-094.png
>
>
> Add PERCENTILE function.
> 
> Return a percentile value based on a continuous distribution of the input 
> column. If no input row lies exactly at the desired percentile, the result is 
> calculated using linear interpolation of the two nearest input values. NULL 
> values are ignored in the calculation.
> Example:
> {code:sql}
> > SELECT PERCENTILE(col, 0.3) FROM VALUES (0), (10), (10) AS tab(col);
>  6.0
> > SELECT PERCENTILE(col, 0.3, freq) FROM VALUES (0, 1), (10, 2) AS tab(col, 
> > freq);
>  6.0
> > SELECT PERCENTILE(col, array(0.25, 0.75)) FROM VALUES (0), (10) AS tab(col);
>  [2.5,7.5]
> {code}
> Syntax:
> {code:sql}
> PERCENTILE(expr, percentage[, frequency])
> {code}
> Arguments:
>  * {{expr}}: A NUMERIC expression.
>  * {{percentage}}: A NUMERIC expression between 0 and 1 or an ARRAY of 
> NUMERIC expressions, each between 0 and 1.
>  * {{frequency}}: An optional integral number greater than 0.
> Returns:
> DOUBLE if percentage is numeric, or an ARRAY of DOUBLE if percentage is an 
> ARRAY.
> Frequency describes the number of times expr must be counted. The default 
> value is 1.
> See also:
>  * 
> [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-Built-inAggregateFunctions(UDAF)]
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#mathematical-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/percentile.html]
>  * [PostgreSQL|https://www.postgresql.org/docs/16/functions-aggregate.html] 
> percentile_cont
>  * 
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/percentile_cont]
>  * 
> [Oracle|https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/PERCENTILE_CONT.html]
>  * [Wiki|https://en.wikipedia.org/wiki/Percentile]
> 
> Currently our implementation is inspired by PERCENTILE of Spark, which offers 
> an additional parameter frequency compared to SQL standard function 
> PERCENTILE_CONT.
> Based on this function, we can easily extend support fo PERCENTILE_CONT and 
> PERCENTILE_DISC in SQL standard with a little modification in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36123) Add PERCENTILE function

2024-09-11 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-36123:

Fix Version/s: 2.0-preview

> Add PERCENTILE function
> ---
>
> Key: FLINK-36123
> URL: https://issues.apache.org/jira/browse/FLINK-36123
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
> Attachments: image-2024-09-06-10-23-42-094.png
>
>
> Add PERCENTILE function.
> 
> Return a percentile value based on a continuous distribution of the input 
> column. If no input row lies exactly at the desired percentile, the result is 
> calculated using linear interpolation of the two nearest input values. NULL 
> values are ignored in the calculation.
> Example:
> {code:sql}
> > SELECT PERCENTILE(col, 0.3) FROM VALUES (0), (10), (10) AS tab(col);
>  6.0
> > SELECT PERCENTILE(col, 0.3, freq) FROM VALUES (0, 1), (10, 2) AS tab(col, 
> > freq);
>  6.0
> > SELECT PERCENTILE(col, array(0.25, 0.75)) FROM VALUES (0), (10) AS tab(col);
>  [2.5,7.5]
> {code}
> Syntax:
> {code:sql}
> PERCENTILE(expr, percentage[, frequency])
> {code}
> Arguments:
>  * {{expr}}: A NUMERIC expression.
>  * {{percentage}}: A NUMERIC expression between 0 and 1 or an ARRAY of 
> NUMERIC expressions, each between 0 and 1.
>  * {{frequency}}: An optional integral number greater than 0.
> Returns:
> DOUBLE if percentage is numeric, or an ARRAY of DOUBLE if percentage is an 
> ARRAY.
> Frequency describes the number of times expr must be counted. The default 
> value is 1.
> See also:
>  * 
> [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-Built-inAggregateFunctions(UDAF)]
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#mathematical-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/percentile.html]
>  * [PostgreSQL|https://www.postgresql.org/docs/16/functions-aggregate.html] 
> percentile_cont
>  * 
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/percentile_cont]
>  * 
> [Oracle|https://docs.oracle.com/en/database/oracle/oracle-database/23/sqlrf/PERCENTILE_CONT.html]
>  * [Wiki|https://en.wikipedia.org/wiki/Percentile]
> 
> Currently our implementation is inspired by PERCENTILE of Spark, which offers 
> an additional parameter frequency compared to SQL standard function 
> PERCENTILE_CONT.
> Based on this function, we can easily extend support fo PERCENTILE_CONT and 
> PERCENTILE_DISC in SQL standard with a little modification in the future.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36214) Error log when building flink-cdc-pipeline-udf-examples from source code

2024-09-04 Thread lincoln lee (Jira)
lincoln lee created FLINK-36214:
---

 Summary: Error log when building flink-cdc-pipeline-udf-examples 
from source code
 Key: FLINK-36214
 URL: https://issues.apache.org/jira/browse/FLINK-36214
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: lincoln lee


There's an error log when building from source code(encountered on 3.2.0 rc & 
master branch), but not fail the build. 

{code}

[INFO] --< org.apache.flink:flink-cdc-pipeline-udf-examples >--
[INFO] Building flink-cdc-pipeline-udf-examples 3.2.0                    [3/42]
[INFO] [ jar ]-
[INFO]
[INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ 
flink-cdc-pipeline-udf-examples ---
[INFO] Deleting 
/Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/target
[INFO]
[INFO] --- flatten-maven-plugin:1.5.0:clean (flatten.clean) @ 
flink-cdc-pipeline-udf-examples ---
[INFO] Deleting 
/Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/.flattened-pom.xml
[INFO]
[INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ 
flink-cdc-pipeline-udf-examples ---
[INFO]
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ 
flink-cdc-pipeline-udf-examples ---
[INFO]
[INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
flink-cdc-pipeline-udf-examples ---
[INFO]
[INFO] --- maven-remote-resources-plugin:1.5:process (process-resource-bundles) 
@ flink-cdc-pipeline-udf-examples ---
[INFO]
[INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
flink-cdc-pipeline-udf-examples ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/main/resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- flatten-maven-plugin:1.5.0:flatten (flatten) @ 
flink-cdc-pipeline-udf-examples ---
[INFO] Generating flattened POM of project 
org.apache.flink:flink-cdc-pipeline-udf-examples:jar:3.2.0...
[INFO]
[INFO] --- scala-maven-plugin:4.9.2:add-source (scala-compile-first) @ 
flink-cdc-pipeline-udf-examples ---
[INFO] Add Source directory: 
/Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/main/scala
[INFO] Add Test Source directory: 
/Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/test/scala
[INFO]
[INFO] --- scala-maven-plugin:4.9.2:compile (scala-compile-first) @ 
flink-cdc-pipeline-udf-examples ---
[INFO] Compiler bridge file: 
/Users/lilin/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.10.0-bin_2.12.16__52.0-1.10.0_20240505T232140.jar
[INFO] compiling 8 Scala sources and 8 Java sources to 
/Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/target/classes
 ...
[ERROR] -release is only supported on Java 9 and higher
[INFO] done compiling
[INFO] compile in 8.2 s

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35988) Reduce the number of state queries in the AppendOnlyFirstNFunction.

2024-08-29 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35988.
---
Resolution: Fixed

Fixed in master: d9931c8af05d0f1f721be9fe920690fe122507ad

> Reduce the number of state queries in the AppendOnlyFirstNFunction.
> ---
>
> Key: FLINK-35988
> URL: https://issues.apache.org/jira/browse/FLINK-35988
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: luolei
>Assignee: luolei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> In the AppendOnlyFirstNFunction, there are two data reads from the state 
> within a single processElement operation. This has a significant impact on 
> performance, especially when the state size is large.
> {code:java}
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // check message should be insert only.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = state.value() == null ? 0 : state.value();
> // ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank += 1;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }{code}
>  
> Remedial measure: Optimize the code to reduce one state query invocation.
> {code:java}
> @Override
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // Ensure the message is an insert-only operation.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = getCurrentRank();
> // Ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank++;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }
> private int getCurrentRank() throws IOException {
> Integer value = state.value();
> return value == null ? 0 : value;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35988) Reduce the number of state queries in the AppendOnlyFirstNFunction.

2024-08-28 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35988:

Issue Type: Improvement  (was: New Feature)

> Reduce the number of state queries in the AppendOnlyFirstNFunction.
> ---
>
> Key: FLINK-35988
> URL: https://issues.apache.org/jira/browse/FLINK-35988
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: luolei
>Assignee: luolei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> In the AppendOnlyFirstNFunction, there are two data reads from the state 
> within a single processElement operation. This has a significant impact on 
> performance, especially when the state size is large.
> {code:java}
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // check message should be insert only.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = state.value() == null ? 0 : state.value();
> // ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank += 1;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }{code}
>  
> Remedial measure: Optimize the code to reduce one state query invocation.
> {code:java}
> @Override
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // Ensure the message is an insert-only operation.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = getCurrentRank();
> // Ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank++;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }
> private int getCurrentRank() throws IOException {
> Integer value = state.value();
> return value == null ? 0 : value;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35988) Reduce the number of state queries in the AppendOnlyFirstNFunction.

2024-08-28 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35988:

Fix Version/s: 2.0.0

> Reduce the number of state queries in the AppendOnlyFirstNFunction.
> ---
>
> Key: FLINK-35988
> URL: https://issues.apache.org/jira/browse/FLINK-35988
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: luolei
>Assignee: luolei
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> In the AppendOnlyFirstNFunction, there are two data reads from the state 
> within a single processElement operation. This has a significant impact on 
> performance, especially when the state size is large.
> {code:java}
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // check message should be insert only.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = state.value() == null ? 0 : state.value();
> // ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank += 1;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }{code}
>  
> Remedial measure: Optimize the code to reduce one state query invocation.
> {code:java}
> @Override
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // Ensure the message is an insert-only operation.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = getCurrentRank();
> // Ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank++;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }
> private int getCurrentRank() throws IOException {
> Integer value = state.value();
> return value == null ? 0 : value;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35988) Reduce the number of state queries in the AppendOnlyFirstNFunction.

2024-08-28 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35988?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35988:
---

Assignee: luolei

> Reduce the number of state queries in the AppendOnlyFirstNFunction.
> ---
>
> Key: FLINK-35988
> URL: https://issues.apache.org/jira/browse/FLINK-35988
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: luolei
>Assignee: luolei
>Priority: Major
>  Labels: pull-request-available
>
> In the AppendOnlyFirstNFunction, there are two data reads from the state 
> within a single processElement operation. This has a significant impact on 
> performance, especially when the state size is large.
> {code:java}
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // check message should be insert only.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = state.value() == null ? 0 : state.value();
> // ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank += 1;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }{code}
>  
> Remedial measure: Optimize the code to reduce one state query invocation.
> {code:java}
> @Override
> public void processElement(RowData input, Context context, Collector 
> out)
> throws Exception {
> initRankEnd(input);
> // Ensure the message is an insert-only operation.
> Preconditions.checkArgument(input.getRowKind() == RowKind.INSERT);
> int currentRank = getCurrentRank();
> // Ignore record if it does not belong to the first-n rows
> if (currentRank >= rankEnd) {
> return;
> }
> currentRank++;
> state.update(currentRank);
> if (outputRankNumber || hasOffset()) {
> collectInsert(out, input, currentRank);
> } else {
> collectInsert(out, input);
> }
> }
> private int getCurrentRank() throws IOException {
> Integer value = state.value();
> return value == null ? 0 : value;
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36157) FLIP-473 Introduce New SQL Operators Based on Asynchronous State APIs

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36157:
---

Assignee: xuyang

> FLIP-473 Introduce New SQL Operators Based on Asynchronous State APIs
> -
>
> Key: FLINK-36157
> URL: https://issues.apache.org/jira/browse/FLINK-36157
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>
> This is an umbrella Jira for FLIP-473.
> See more at 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-473+Introduce+New+SQL+Operators+Based+on+Asynchronous+State+APIs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35964) Add STARTSWITH function

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35964.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: 4c14c763ca0f9c71d642953228b364dce3eb3bf4

> Add STARTSWITH function
> ---
>
> Key: FLINK-35964
> URL: https://issues.apache.org/jira/browse/FLINK-35964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add STARTSWITH function.
> 
> Returns if {{expr}} begins with {{startExpr}}.
> Example:
> {code:sql}
> > SELECT STARTSWITH('SparkSQL', 'Spark');
>  true
> > SELECT STARTSWITH('SparkSQL', 'spark');
>  false
> {code}
> Syntax:
> {code:sql}
> STARTSWITH(expr, startExpr)
> {code}
> Arguments:
>  * {{expr}}: A STRING or BINARY expression.
>  * {{startExpr}}: A STRING or BINARY expression.
> Returns:
> A BOOLEAN.
> {{expr}} and {{startExpr}} should have same type.
> If {{expr}} or {{startExpr}} is NULL, the result is NULL.
> If {{startExpr}} is empty, the result is true.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/startswith.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35965) Add ENDSWITH function

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35965.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: 5bbdbb2889268e879b6aa06d2cdab02fd6154cb6

> Add ENDSWITH function
> -
>
> Key: FLINK-35965
> URL: https://issues.apache.org/jira/browse/FLINK-35965
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add ENDSWITH function.
> 
> Returns if {{expr}} ends with {{endExpr}}.
> Example:
> {code:sql}
> > SELECT ENDSWITH('SparkSQL', 'SQL');
>  true
> > SELECT ENDSWITH('SparkSQL', 'sql');
>  false
> {code}
> Syntax:
> {code:sql}
> ENDSWITH(expr, endExpr)
> {code}
> Arguments:
>  * {{expr}}: A STRING or BINARY expression.
>  * {{endExpr}}: A STRING or BINARY expression.
> Returns:
> A BOOLEAN.
> {{expr}} and {{endExpr}} should have same type.
> If {{expr}} or {{endExpr}} is NULL, the result is NULL.
> If {{endExpr}} is the empty, the result is true.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/endswith.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35965) Add ENDSWITH function

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35965:
---

Assignee: Dylan He

> Add ENDSWITH function
> -
>
> Key: FLINK-35965
> URL: https://issues.apache.org/jira/browse/FLINK-35965
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ENDSWITH function.
> 
> Returns if {{expr}} ends with {{endExpr}}.
> Example:
> {code:sql}
> > SELECT ENDSWITH('SparkSQL', 'SQL');
>  true
> > SELECT ENDSWITH('SparkSQL', 'sql');
>  false
> {code}
> Syntax:
> {code:sql}
> ENDSWITH(expr, endExpr)
> {code}
> Arguments:
>  * {{expr}}: A STRING or BINARY expression.
>  * {{endExpr}}: A STRING or BINARY expression.
> Returns:
> A BOOLEAN.
> {{expr}} and {{endExpr}} should have same type.
> If {{expr}} or {{endExpr}} is NULL, the result is NULL.
> If {{endExpr}} is the empty, the result is true.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/endswith.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35964) Add STARTSWITH function

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35964:
---

Assignee: Dylan He

> Add STARTSWITH function
> ---
>
> Key: FLINK-35964
> URL: https://issues.apache.org/jira/browse/FLINK-35964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add STARTSWITH function.
> 
> Returns if {{expr}} begins with {{startExpr}}.
> Example:
> {code:sql}
> > SELECT STARTSWITH('SparkSQL', 'Spark');
>  true
> > SELECT STARTSWITH('SparkSQL', 'spark');
>  false
> {code}
> Syntax:
> {code:sql}
> STARTSWITH(expr, startExpr)
> {code}
> Arguments:
>  * {{expr}}: A STRING or BINARY expression.
>  * {{startExpr}}: A STRING or BINARY expression.
> Returns:
> A BOOLEAN.
> {{expr}} and {{startExpr}} should have same type.
> If {{expr}} or {{startExpr}} is NULL, the result is NULL.
> If {{startExpr}} is empty, the result is true.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/startswith.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36100) Support ESCAPE in built-in function LIKE formally

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36100.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: ae4eb7dc3ae399659e52da39f7ac74242aaca3b5

> Support ESCAPE in built-in function LIKE formally
> -
>
> Key: FLINK-36100
> URL: https://issues.apache.org/jira/browse/FLINK-36100
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Flink does not formally support ESCAPE in built-in function LIKE, but in some 
> cases we do need it because '_' and '%' are interpreted as special wildcard 
> characters, preventing their use in their literal sense.
> And currently, if we forcefully use ESCAPE characters, we will get unexpected 
> results like the cases below.
> {code:SQL}
> > SELECT 'TE_ST' LIKE '%E\_S%';
>  FALSE
> > SELECT 'TE_ST' LIKE '%E\_S%' ESCAPE '\';
>  ERROR
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36100) Support ESCAPE in built-in function LIKE formally

2024-08-26 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-36100:

Affects Version/s: 1.19.1
   1.20.0

> Support ESCAPE in built-in function LIKE formally
> -
>
> Key: FLINK-36100
> URL: https://issues.apache.org/jira/browse/FLINK-36100
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Flink does not formally support ESCAPE in built-in function LIKE, but in some 
> cases we do need it because '_' and '%' are interpreted as special wildcard 
> characters, preventing their use in their literal sense.
> And currently, if we forcefully use ESCAPE characters, we will get unexpected 
> results like the cases below.
> {code:SQL}
> > SELECT 'TE_ST' LIKE '%E\_S%';
>  FALSE
> > SELECT 'TE_ST' LIKE '%E\_S%' ESCAPE '\';
>  ERROR
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36149) Support cleaning up expired states to prevent the continuous increase of states and add RocksDB state cleanup configuration.

2024-08-26 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876663#comment-17876663
 ] 

lincoln lee commented on FLINK-36149:
-

[~lexluo09] Thanks for reporting this! It seems a useful optimization for 
expired data in stateful operations.

But the problem should exist in all stateful operators, so this seems to be a 
general optimization, not limited to a specific sql operator(e.g., Rank), from 
this perspective, should we introduce a job level parameter to control this 
optimization as a whole?

Further, could the 
parameter(StateTtlConfig.RocksdbCompactFilterCleanupStrategy.DEFAULT_QUERY_TIME_AFTER_NUM_ENTRIES)
 take effect directly at the statebackend level(without introducing new options 
for sql module), so that not only sql but also datastream jobs could benefit 
from it? 

> Support cleaning up expired states to prevent the continuous increase of 
> states and add RocksDB state cleanup configuration.
> 
>
> Key: FLINK-36149
> URL: https://issues.apache.org/jira/browse/FLINK-36149
> Project: Flink
>  Issue Type: Improvement
>Reporter: luolei
>Priority: Major
>  Labels: pull-request-available
> Attachments: 1724512324453.jpg, 1724512362249.jpg
>
>
> 1、Problem description:
> {code:java}
> select *
> from 
> ( 
>     SELECT *, ROW_NUMBER() OVER (PARTITION BY song_id, user_id, 
> FLOOR(proc_time TO day) order by proc_time asc ) as row_num 
>     from  tableA
>     where  cmd = 1 and user_id > 0
> ) 
> where row_num <=10  {code}
> Currently, the deduplication operator uses the Flink State TTL mechanism. The 
> default behavior of this mechanism is that expired states are only cleaned up 
> when they are accessed again. In our case, the key in the Flink state 
> includes the LOOR (proc_time TO day) timestamp. For example, if today is 
> December 28th, the new keys in the Flink state will include December 28th. 
> When it becomes December 29th, the keys for new records will include December 
> 29th, and the keys from December 28th will never be accessed again. Since 
> they are not accessed, they will not be cleaned up by the Flink State TTL 
> mechanism. As a result, the state in Flink will increase indefinitely.
>  
>  
> {code:java}
> 2021-02-25 06:49:25,593 WARN  akka.remote.transport.netty.NettyTransport  
>  [] - Remote connection to [null] failed with 
> java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:608992021-02-25 06:49:25,593 WARN  
> akka.remote.ReliableDeliverySupervisor   [] - Association 
> with remote system [akka.tcp://fl...@hadoop02.tcd.com:60899] has failed, 
> address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://fl...@hadoop02.tcd.com:60899]] Caused by: 
> [java.net.ConnectException: Connection refused: 
> hadoop02.tcd.com/9.44.33.8:60899]2021-02-25 06:49:32,762 INFO  
> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
> Worker container_e26_1614150721877_0021_01_04 is terminated. Diagnostics: 
> [2021-02-25 06:49:31.879]Container 
> [pid=24324,containerID=container_e26_1614150721877_0021_01_04] is running 
> 103702528B beyond the 'PHYSICAL' memory limit. Current usage: 4.1 GB of 4 GB 
> physical memory used; 6.3 GB of 8.4 GB virtual memory used. Killing 
> container.Dump of the process-tree for 
> container_e26_1614150721877_0021_01_04 :|- PID PPID PGRPID SESSID 
> CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) 
> RSSMEM_USAGE(PAGES) FULL_CMD_LINE|- 24551 24324 24324 24324 (java) 
> 1130639 94955 6799687680 1073522 /usr/java/jdk1.8.0_131/bin/java 
> -Xmx1664299798 -Xms1664299798 -XX:MaxDirectMemorySize=493921243 
> -XX:MaxMetaspaceSize=268435456 
> -Dlog.file=/data4/yarn/container-logs/application_1614150721877_0021/container_e26_1614150721877_0021_01_04/taskmanager.log
>  -Dlog4j.configuration=file:./log4j.properties 
> -Dlog4j.configurationFile=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner -D 
> taskmanager.memory.framework.off-heap.size=134217728b -D 
> taskmanager.memory.network.max=359703515b -D 
> taskmanager.memory.network.min=359703515b -D 
> taskmanager.memory.framework.heap.size=134217728b -D 
> taskmanager.memory.managed.size=1438814063b -D taskmanager.cpu.cores=1.0 -D 
> taskmanager.memory.task.heap.size=1530082070b -D 
> taskmanager.memory.task.off-heap.size=0b --configDir . 
> -Djobmanager.rpc.address=hadoop02.tcd.com 
> -Djobmanager.memory.jvm-overhead.min=201326592b -Dpipeline.classpaths= 
> -Dtaskmanager.resource-id=container_e26_1614150721877_0021_01_04 
> -Dweb.port=0 -Djobmanager.memory.off-heap.size=134217728b 
> -Dexecutio

[jira] [Assigned] (FLINK-36100) Support ESCAPE in built-in function LIKE formally

2024-08-25 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36100:
---

Assignee: Dylan He

> Support ESCAPE in built-in function LIKE formally
> -
>
> Key: FLINK-36100
> URL: https://issues.apache.org/jira/browse/FLINK-36100
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Flink does not formally support ESCAPE in built-in function LIKE, but in some 
> cases we do need it because '_' and '%' are interpreted as special wildcard 
> characters, preventing their use in their literal sense.
> And currently, if we forcefully use ESCAPE characters, we will get unexpected 
> results like the cases below.
> {code:SQL}
> > SELECT 'TE_ST' LIKE '%E\_S%';
>  FALSE
> > SELECT 'TE_ST' LIKE '%E\_S%' ESCAPE '\';
>  ERROR
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36136) Remove TableSource & TableSink impl and utils, docs and etc.

2024-08-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36136:
---

Assignee: xuyang

> Remove TableSource & TableSink impl and utils, docs and etc.
> 
>
> Key: FLINK-36136
> URL: https://issues.apache.org/jira/browse/FLINK-36136
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> This Jira is a sub-task in https://issues.apache.org/jira/browse/FLINK-33748



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36135) Remove the TableSource and TableSink ecosystem APIs like TableFactory, DefinedFieldMapping, FilterableTableSource, etc.

2024-08-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36135:
---

Assignee: xuyang

> Remove the TableSource and TableSink ecosystem APIs like TableFactory, 
> DefinedFieldMapping, FilterableTableSource, etc.
> ---
>
> Key: FLINK-36135
> URL: https://issues.apache.org/jira/browse/FLINK-36135
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> This Jira is a sub-task in https://issues.apache.org/jira/browse/FLINK-33748
> Remove apis such as 
>  * DefinedRowtimeAttributes
>  * ProjectableTableSource
>  * FilterableTableSource
>  * ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36133) Remove subclass implementations of TableSource and TableSink, along with their corresponding Factory classes.

2024-08-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36133:
---

Assignee: xuyang

> Remove subclass implementations of TableSource and TableSink, along with 
> their corresponding Factory classes.
> -
>
> Key: FLINK-36133
> URL: https://issues.apache.org/jira/browse/FLINK-36133
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> This Jira is a sub-task in https://issues.apache.org/jira/browse/FLINK-33748
> Remove subclasses such as 
>  * ArrowTableSource
>  * CsvTableSource
>  * ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36134) Remove the classes associated with TableSource and TableSink as part of the optimization process.

2024-08-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36134:
---

Assignee: xuyang

> Remove the classes associated with TableSource and TableSink as part of the 
> optimization process.
> -
>
> Key: FLINK-36134
> URL: https://issues.apache.org/jira/browse/FLINK-36134
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> This Jira is a sub-task in https://issues.apache.org/jira/browse/FLINK-33748
> Remove related rules and nodes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36132) Remove codes that references TableSource and TableSink directly at the API level.

2024-08-22 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36132:
---

Assignee: xuyang

> Remove codes that references TableSource and TableSink directly at the API 
> level.
> -
>
> Key: FLINK-36132
> URL: https://issues.apache.org/jira/browse/FLINK-36132
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> This Jira is a sub-task in https://issues.apache.org/jira/browse/FLINK-33748
> This Jira is responsible for the following removal:
> ● Remove APIs related to TableEnvironmentInternal
>     ○ #fromTableSource
>     ○ #registerTableSourceInternal
>     ○ #registerTableSinkInternal
> ● Remove APIs related to StreamTableEnvironment
>     ○ #toAppendStream
>     ○ #toRetractStream
> ● Update subclasses of Operation
>     ○ Remove TableSourceQueryOperation
>     ○ Update QueryOperationCatalogViewTable
>     ○ Remove UnregisteredSinkModifyOperation
>     ○ Remove RichTableSourceQueryOperation (heavily tested, consider a 
> separate PR)
>     ○ Remove OutputConversionModifyOperation (this part is pending, depends 
> on Python)
> ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35932) Add REGEXP_COUNT function

2024-08-21 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35932.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: 8189bba195326d899a9830d3a19b50cafa092666

> Add REGEXP_COUNT function
> -
>
> Key: FLINK-35932
> URL: https://issues.apache.org/jira/browse/FLINK-35932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add REGEXP_COUNT function.
> 
> Returns the number of times {{str}} matches the {{regex}} pattern.
> Example:
> {code:sql}
> > SELECT REGEXP_COUNT('Steven Jones and Stephen Smith are the best players', 
> > 'Ste(v|ph)en');
>  2
> > SELECT REGEXP_COUNT('Mary had a little lamb', 'Ste(v|ph)en');
>  0
> {code}
> Syntax:
> {code:sql}
> REGEXP_COUNT(str, regex)
> {code}
> Arguments:
>  * {{str}}: A STRING expression to be matched.
>  * {{regex}}: A STRING expression with a matching pattern.
> Returns:
> An INTEGER.
> {{regex}} must be a Java regular expression.
> If either argument is {{NULL}}, the result is {{NULL}}.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/regexp_count.html]
>  * [PostgreSQL|https://www.postgresql.org/docs/16/functions-string.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35962) Add REGEXP_INSTR function

2024-08-21 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35962.
---
Fix Version/s: 2.0.0
 Assignee: Dylan He
   Resolution: Fixed

Fixed in master: cbbb1cd178a5b53200ee793f77bac73a0d49ff14

> Add REGEXP_INSTR function
> -
>
> Key: FLINK-35962
> URL: https://issues.apache.org/jira/browse/FLINK-35962
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add REGEXP_INSTR function.
> 
> Returns the position of the first substring in {{str}} that matches {{regex}}.
> Example:
> {code:sql}
> > SELECT REGEXP_INSTR('Steven Jones and Stephen Smith are the best players', 
> > 'Ste(v|ph)en');
>  1
> > SELECT REGEXP_INSTR('Mary had a little lamb', 'Ste(v|ph)en');
>  0
> {code}
> Syntax:
> {code:sql}
> REGEXP_INSTR(str, regex)
> {code}
> Arguments:
>  * {{str}}: A STRING expression to be matched.
>  * {{regex}}: A STRING expression with a pattern.
> Returns:
> An INTEGER.
> Result indexes begin at 1, 0 if there is no match.
> In case of a malformed {{regex}} the function returns an error. 
> If either argument is NULL, the result is NULL.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/regexp_instr.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/regexp.html#function_regexp-instr]
>  * 
> [PostgreSQL|https://www.postgresql.org/docs/16/functions-matching.html#FUNCTIONS-POSIX-REGEXP]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35931) Add REGEXP_EXTRACT_ALL function

2024-08-21 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35931.
---
Fix Version/s: 2.0.0
   Resolution: Fixed

Fixed in master: 7fa4ebad057bca765b661a553f4ccc2df7861a05

> Add REGEXP_EXTRACT_ALL function
> ---
>
> Key: FLINK-35931
> URL: https://issues.apache.org/jira/browse/FLINK-35931
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add REGEXP_EXTRACT_ALL function.
> 
> Extracts all of the strings in {{str}} that match the {{regex}} expression 
> and correspond to the regex group {{extractIndex}}.
> Example:
> {code:sql}
> > SELECT REGEXP_EXTRACT_ALL('100-200, 300-400', '(\\d+)-(\\d+)', 1);
>  [100, 300]
> {code}
> Syntax:
> {code:sql}
> REGEXP_EXTRACT_ALL(str, regex[, extractIndex])
> {code}
> Arguments:
>  * {{str}}: A STRING expression to be matched.
>  * {{regex}}: A STRING expression with a matching pattern.
>  * {{extractIndex}}: An optional INTEGER expression with default 0.
> Returns:
> An ARRAY.
> {{regex}} must be a Java regular expression.
> {{regex}} may contain multiple groups. {{extractIndex}} indicates which regex 
> group to extract and starts from 1. 0 means matching the entire regular 
> expression, also the default value if not specified.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/regexp_extract_all.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35963) Add REGEXP_SUBSTR function

2024-08-21 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35963.
---
Fix Version/s: 2.0.0
 Assignee: Dylan He
   Resolution: Fixed

Fixed in master: e8c1d1b9d411e17130a57851c0bbaecb2e816048

> Add REGEXP_SUBSTR function
> --
>
> Key: FLINK-35963
> URL: https://issues.apache.org/jira/browse/FLINK-35963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add REGEXP_SUBSTR function.
> 
> Returns the first substring in {{str}} that matches {{regex}}.
> Example:
> {code:sql}
> > SELECT REGEXP_SUBSTR('Steven Jones and Stephen Smith are the best players', 
> > 'Ste(v|ph)en');
>  Steven
> > SELECT REGEXP_SUBSTR('Mary had a little lamb', 'Ste(v|ph)en');
>  NULL
> {code}
> Syntax:
> {code:sql}
> REGEXP_SUBSTR(str, regex)
> {code}
> Arguments:
>  * {{str}}: A STRING expression to be matched.
>  * {{regex}}: A STRING expression with a pattern.
> Returns:
> A STRING.
> In case of a malformed {{regex}} the function returns an error. 
> If either argument is NULL or the pattern is not found, the result is NULL.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/regexp_substr.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/regexp.html#function_regexp-substr]
>  * 
> [PostgreSQL|https://www.postgresql.org/docs/16/functions-matching.html#FUNCTIONS-POSIX-REGEXP]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36095) KeyedLookupJoinWrapper should shuffle by input upsertKey instead of join key to avoid changelog disordering

2024-08-20 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-36095.
---
Resolution: Fixed

fixed in master: 624bc5001c831d3d04a5680c03910c634a0c988b

> KeyedLookupJoinWrapper should shuffle by input upsertKey instead of join key 
> to avoid changelog disordering
> ---
>
> Key: FLINK-36095
> URL: https://issues.apache.org/jira/browse/FLINK-36095
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> If user encounter the NDU issue caused by a lookup join and enable the 
> {{TRY_RESOLVE}} mode ( 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/#33-how-to-eliminate-the-impact-of-non-deterministic-update-in-streaming
>  ), the current KeyedLookupJoinWrapper implementation use the join key as the 
> shuffle key, this may lead to changelog disordering issue. It should be fixed 
> to use input upsertKey(or the complete row if upsertKey is empty) instead of 
> join key.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-33748) Remove legacy TableSource/TableSink API in 2.0

2024-08-20 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-33748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-33748:
---

Assignee: xuyang

> Remove legacy TableSource/TableSink API in 2.0
> --
>
> Key: FLINK-33748
> URL: https://issues.apache.org/jira/browse/FLINK-33748
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Weijie Guo
>Assignee: xuyang
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> {{TableSource}} and {{TableSink}} already marked as deprecated in 
> FLINK-19453, and can be removed in 2.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33748) Remove legacy TableSource/TableSink API in 2.0

2024-08-20 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17875157#comment-17875157
 ] 

lincoln lee commented on FLINK-33748:
-

[~xuyangzhong] thanks for volunteering!Assigned to you.

Looks like this is going to involve a lot of changes, and we might consider 
splitting up some of the more detailed subtasks so that more people can 
contribute too, WDYT?

> Remove legacy TableSource/TableSink API in 2.0
> --
>
> Key: FLINK-33748
> URL: https://issues.apache.org/jira/browse/FLINK-33748
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Table SQL / API
>Reporter: Weijie Guo
>Priority: Major
>  Labels: 2.0-related
> Fix For: 2.0.0
>
>
> {{TableSource}} and {{TableSink}} already marked as deprecated in 
> FLINK-19453, and can be removed in 2.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36095) KeyedLookupJoinWrapper should shuffle by input upsertKey instead of join key to avoid changelog disordering

2024-08-19 Thread lincoln lee (Jira)
lincoln lee created FLINK-36095:
---

 Summary: KeyedLookupJoinWrapper should shuffle by input upsertKey 
instead of join key to avoid changelog disordering
 Key: FLINK-36095
 URL: https://issues.apache.org/jira/browse/FLINK-36095
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.20.0
Reporter: lincoln lee
Assignee: lincoln lee
 Fix For: 2.0.0


If user encounter the NDU issue caused by a lookup join and enable the 
{{TRY_RESOLVE}} mode ( 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/#33-how-to-eliminate-the-impact-of-non-deterministic-update-in-streaming
 ), the current KeyedLookupJoinWrapper implementation use the join key as the 
shuffle key, this may lead to changelog disordering issue. It should be fixed 
to use input upsertKey(or the complete row if upsertKey is empty) instead of 
join key.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35997) Incomplete Table API & Python API of built-in function LTRIM & RTRIM

2024-08-16 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35997.
---
  Assignee: Dylan He
Resolution: Fixed

Fixed in master: 500d274265d100e4fe091e27fa1d96cebd812c2d

> Incomplete Table API & Python API of built-in function LTRIM & RTRIM
> 
>
> Key: FLINK-35997
> URL: https://issues.apache.org/jira/browse/FLINK-35997
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Currently, Table API & Python API for built-in function LTRIM supports only 
> one syntax:
> {code:java}
> expr.ltrim()
> {code}
> While SQL API supports two syntax:
> {code:sql}
> LTRIM(expr)
> LTRIM(expr, trimStr)
> {code}
> The missing syntax should be incorporated, Python API as well.
> The same applies to function RTRIM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35993) Add UNHEX function

2024-08-15 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35993.
---
Resolution: Fixed

Fixed in master: 215ad96248c989405177704a8eb63c6ff3215792

> Add UNHEX function
> --
>
> Key: FLINK-35993
> URL: https://issues.apache.org/jira/browse/FLINK-35993
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add UNHEX function.
> 
> Converts hexadecimal {{expr}} to BINARY.
> Example:
> {code:sql}
> > SELECT DECODE(UNHEX('466C696E6B'), 'UTF-8');
> Flink
> {code}
> Syntax:
> {code:sql}
> UNHEX(expr)
> {code}
> Arguments:
>  * {{expr}}: A STRING expression of hexadecimal characters.
> Returns:
> A BINARY.
> If the length of {{expr}} is odd, the first character is discarded and the 
> result is left padded with a null byte. 
> If {{expr}} contains non-hex characters, the result is NULL.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#mathematical-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/unhex.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_unhex]
>  * 
> [PostgreSQL|https://www.postgresql.org/docs/16/functions-binarystring.html] 
> decode(expr, 'hex')
>  * 
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/hex_decode_binary]
>  * [Calcite|https://calcite.apache.org/docs/reference.html] from_hex



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35993) Add UNHEX function

2024-08-15 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35993:

Fix Version/s: 2.0.0

> Add UNHEX function
> --
>
> Key: FLINK-35993
> URL: https://issues.apache.org/jira/browse/FLINK-35993
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Add UNHEX function.
> 
> Converts hexadecimal {{expr}} to BINARY.
> Example:
> {code:sql}
> > SELECT DECODE(UNHEX('466C696E6B'), 'UTF-8');
> Flink
> {code}
> Syntax:
> {code:sql}
> UNHEX(expr)
> {code}
> Arguments:
>  * {{expr}}: A STRING expression of hexadecimal characters.
> Returns:
> A BINARY.
> If the length of {{expr}} is odd, the first character is discarded and the 
> result is left padded with a null byte. 
> If {{expr}} contains non-hex characters, the result is NULL.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#mathematical-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/unhex.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_unhex]
>  * 
> [PostgreSQL|https://www.postgresql.org/docs/16/functions-binarystring.html] 
> decode(expr, 'hex')
>  * 
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/hex_decode_binary]
>  * [Calcite|https://calcite.apache.org/docs/reference.html] from_hex



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35920) Add PRINTF function

2024-08-15 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35920.
---
Resolution: Fixed

Fixed in master: a91085ebdbcaf7784f9af78f6f4f40079f310399

> Add PRINTF function
> ---
>
> Key: FLINK-35920
> URL: https://issues.apache.org/jira/browse/FLINK-35920
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add PRINTF function as the same in Spark & Hive.
> 
> Returns a formatted string from printf-style format strings.
> Example:
> {code:sql}
> > SELECT PRINTF('Hello World %d %s', 100, 'days');
>  Hello World 100 days
> {code}
> Syntax:
> {code:sql}
> PRINTF(strfmt, obj...)
> {code}
> Arguments:
> * {{strfmt}}: A STRING expression.
> * {{obj}}: ANY expression.
> Returns:
> A STRING.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/printf.html]
> * 
> [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf#LanguageManualUDF-StringFunctions]
> * [PostgreSQL|https://www.postgresql.org/docs/16/functions-string.html] format



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35935) CREATE TABLE AS doesn't work with LIMIT

2024-08-15 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35935.
---
Resolution: Fixed

> CREATE TABLE AS doesn't work with LIMIT
> ---
>
> Key: FLINK-35935
> URL: https://issues.apache.org/jira/browse/FLINK-35935
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> {code:java}
> CREATE TABLE WITH (foo) AS (SELECT * FROM bar LIMIT 5){code}
> The above statement throws "Caused by: java.lang.AssertionError: not a query: 
> " exception.
> A workaround is to wrap the query with CTE.
> {code:java}
> CREATE TABLE WITH (foo) AS (WITH R AS (SELECT * FROM bar LIMIT 5) SELECT * 
> FROM R){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35935) CREATE TABLE AS doesn't work with LIMIT

2024-08-15 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873763#comment-17873763
 ] 

lincoln lee edited comment on FLINK-35935 at 8/15/24 11:24 AM:
---

Fixed in master: 729b8b81a77ba6c32711216b88a1bf57ccddfadc

1.19: 9c3c82835422e68441e936b378045b76aad894f9

1.20: c9729dc9201787b980c1d8b0ad05909fbf716bfb


was (Author: lincoln.86xy):
Fixed in master: 729b8b81a77ba6c32711216b88a1bf57ccddfadc

> CREATE TABLE AS doesn't work with LIMIT
> ---
>
> Key: FLINK-35935
> URL: https://issues.apache.org/jira/browse/FLINK-35935
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> {code:java}
> CREATE TABLE WITH (foo) AS (SELECT * FROM bar LIMIT 5){code}
> The above statement throws "Caused by: java.lang.AssertionError: not a query: 
> " exception.
> A workaround is to wrap the query with CTE.
> {code:java}
> CREATE TABLE WITH (foo) AS (WITH R AS (SELECT * FROM bar LIMIT 5) SELECT * 
> FROM R){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35935) CREATE TABLE AS doesn't work with LIMIT

2024-08-14 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873762#comment-17873762
 ] 

lincoln lee commented on FLINK-35935:
-

[~xuyangzhong] Could you also backport this fix to 1.19 & 1.20 branches?

> CREATE TABLE AS doesn't work with LIMIT
> ---
>
> Key: FLINK-35935
> URL: https://issues.apache.org/jira/browse/FLINK-35935
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> {code:java}
> CREATE TABLE WITH (foo) AS (SELECT * FROM bar LIMIT 5){code}
> The above statement throws "Caused by: java.lang.AssertionError: not a query: 
> " exception.
> A workaround is to wrap the query with CTE.
> {code:java}
> CREATE TABLE WITH (foo) AS (WITH R AS (SELECT * FROM bar LIMIT 5) SELECT * 
> FROM R){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35935) CREATE TABLE AS doesn't work with LIMIT

2024-08-14 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873763#comment-17873763
 ] 

lincoln lee commented on FLINK-35935:
-

Fixed in master: 729b8b81a77ba6c32711216b88a1bf57ccddfadc

> CREATE TABLE AS doesn't work with LIMIT
> ---
>
> Key: FLINK-35935
> URL: https://issues.apache.org/jira/browse/FLINK-35935
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> {code:java}
> CREATE TABLE WITH (foo) AS (SELECT * FROM bar LIMIT 5){code}
> The above statement throws "Caused by: java.lang.AssertionError: not a query: 
> " exception.
> A workaround is to wrap the query with CTE.
> {code:java}
> CREATE TABLE WITH (foo) AS (WITH R AS (SELECT * FROM bar LIMIT 5) SELECT * 
> FROM R){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35935) CREATE TABLE AS doesn't work with LIMIT

2024-08-14 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35935:

Fix Version/s: 2.0.0
   1.19.2
   1.20.1

> CREATE TABLE AS doesn't work with LIMIT
> ---
>
> Key: FLINK-35935
> URL: https://issues.apache.org/jira/browse/FLINK-35935
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.1
>Reporter: Xingcan Cui
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> {code:java}
> CREATE TABLE WITH (foo) AS (SELECT * FROM bar LIMIT 5){code}
> The above statement throws "Caused by: java.lang.AssertionError: not a query: 
> " exception.
> A workaround is to wrap the query with CTE.
> {code:java}
> CREATE TABLE WITH (foo) AS (WITH R AS (SELECT * FROM bar LIMIT 5) SELECT * 
> FROM R){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35993) Add UNHEX function

2024-08-14 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35993:
---

Assignee: Dylan He

> Add UNHEX function
> --
>
> Key: FLINK-35993
> URL: https://issues.apache.org/jira/browse/FLINK-35993
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add UNHEX function.
> 
> Converts hexadecimal {{expr}} to BINARY.
> Example:
> {code:sql}
> > SELECT DECODE(UNHEX('466C696E6B'), 'UTF-8');
> Flink
> {code}
> Syntax:
> {code:sql}
> UNHEX(expr)
> {code}
> Arguments:
>  * {{expr}}: A STRING expression of hexadecimal characters.
> Returns:
> A BINARY.
> If the length of {{expr}} is odd, the first character is discarded and the 
> result is left padded with a null byte. 
> If {{expr}} contains non-hex characters, the result is NULL.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#mathematical-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/unhex.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_unhex]
>  * 
> [PostgreSQL|https://www.postgresql.org/docs/16/functions-binarystring.html] 
> decode(expr, 'hex')
>  * 
> [Snowflake|https://docs.snowflake.com/en/sql-reference/functions/hex_decode_binary]
>  * [Calcite|https://calcite.apache.org/docs/reference.html] from_hex



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36043) [SQL] Unexpected SinkMaterializer operator generated when use coalesce function on upsert key fields

2024-08-13 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36043?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873230#comment-17873230
 ] 

lincoln lee commented on FLINK-36043:
-

[~catyee] [~yunta] Intuitively this is a possible optimization point, but in 
practice it is not that simple.

To give a counterexample of the `coalesce` function, for two rows of data like 
this:
a, b
0, 1
null,1

After `coalesce(a, 0), b` will get
0, 1
0, 1

Clearly, this no longer satisfies the semantic requirements of upsertKey.
I've considered similar derivation optimizations, but on reflection I've found 
that the actual extended derivation is extremely limited(The derivation of 
upsertKey is similar to primary key, and needs to fulfill the functional 
dependency 
[https://opentextbc.ca/dbdesign01/chapter/chapter-11-functional-dependencies/).]

 

But for your example, a workaround would be to rewrite the sql to advance 
coalesce before group by:

{code}

Flink SQL> explain insert into sink_table
> select
>   col1,
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from (select coalesce(col1, 0) col1, col2, col3, col4 from source_table)
> group by col1,col2;

{code}

 

> [SQL] Unexpected SinkMaterializer operator generated when use coalesce 
> function on upsert key fields
> 
>
> Key: FLINK-36043
> URL: https://issues.apache.org/jira/browse/FLINK-36043
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0
>Reporter: Yuan Kui
>Priority: Major
> Attachments: image-2024-08-13-16-40-27-929.png
>
>
> As deigned, the SinkMaterializer operator should not be generated when the 
> upsert keys are the same as the primary keys. Example:
> {code:java}
> -- source table
> create table source_table (
>col1 int,
>col2 int,
>col3 int,
>col4 int
> ) with (
>'connector' = 'datagen',
>    'rows-per-second'='10'
> );
> -- sink table
> create table sink_table(
>col1 int,
>col2 int,
>col3_cnt bigint,
>col4_max int,
>primary key(col1, col2) not enforced
> ) with (
>'connector' = 'blackhole'
> );
> -- sql
> insert into sink_table
> select
>   col1,
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from source_table
> group by col1,col2;{code}
> It works well, and the excution plan has no SinkMaterializer operator:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
> col3_cnt, col4_max])
> +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
> col3_cnt, MAX(col4) AS col4_max])
>    +- Exchange(distribution=[hash[col1, col2]])
>       +- TableSourceScan(table=[[default_catalog, default_database, 
> source_table]], fields=[col1, col2, col3, col4])  {code}
> however, if we use coalesce function on upsert keys, such as:
> {code:java}
> insert into sink_table
> select
>   -- use coalesce
>   coalesce(col1, 0) as col1, 
>   col2,
>   count(col3) as col3_cnt,
>   max(col4) as col4_max
> from source_table
> group by col1,col2; {code}
> the SinkMaterializer operator will be generated:
> {code:java}
> == Optimized Execution Plan ==
> Sink(table=[default_catalog.default_database.sink_table], fields=[col1, col2, 
> col3_cnt, col4_max], upsertMaterialize=[true])
> +- Calc(select=[coalesce(col1, 0) AS col1, col2, col3_cnt, col4_max])
>    +- GroupAggregate(groupBy=[col1, col2], select=[col1, col2, COUNT(col3) AS 
> col3_cnt, MAX(col4) AS col4_max])
>       +- Exchange(distribution=[hash[col1, col2]])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> source_table]], fields=[col1, col2, col3, col4]) {code}
> Changing `coalesce(col1, 0)` to `if(col1 is null, 0, col1)` will meet the 
> same problem.
>  
> The code for determining whether a SinkMaterializer operator should be 
> generated is in 
> `org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.SatisfyUpdateKindTraitVisitor#analyzeUpsertMaterializeStrategy`.
>  If making a point in line 881 like this:
> !image-2024-08-13-16-40-27-929.png!
> I found the changeLogUpsertKeys are empty, which lead to 'whether to generate 
> SinkMaterializer'  always true.
> Is that by design or a bug?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35987) Add ELT function

2024-08-13 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17873206#comment-17873206
 ] 

lincoln lee commented on FLINK-35987:
-

The final implementation includes the compile-phase check.

> Add ELT function
> 
>
> Key: FLINK-35987
> URL: https://issues.apache.org/jira/browse/FLINK-35987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ELT function.
> 
> Returns the {{index}}-th expression.
> Example:
> {code:sql}
> > SELECT ELT(1, 'scala', 'java');
>  scala
> {code}
> Syntax:
> {code:sql}
> ELT(index, expr...)
> {code}
> Arguments:
>  * {{index}}: An INTEGER expression.
>  * {{expr}}: A STRING or BINARY expression. One expr is required as least.
> Returns:
> The result has the type of the least common type of all {{expr}}.
> {{index}} must be between 1 and the number of {{expr}}. Otherwise, the 
> function returns an error.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/elt.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_elt]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35987) Add ELT function

2024-08-13 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35987.
---
Resolution: Fixed

Fixed in master: e33028db6a79fad8c7fa3b4d93760112fd91ca7a

> Add ELT function
> 
>
> Key: FLINK-35987
> URL: https://issues.apache.org/jira/browse/FLINK-35987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ELT function.
> 
> Returns the {{index}}-th expression.
> Example:
> {code:sql}
> > SELECT ELT(1, 'scala', 'java');
>  scala
> {code}
> Syntax:
> {code:sql}
> ELT(index, expr...)
> {code}
> Arguments:
>  * {{index}}: An INTEGER expression.
>  * {{expr}}: A STRING or BINARY expression. One expr is required as least.
> Returns:
> The result has the type of the least common type of all {{expr}}.
> {{index}} must be between 1 and the number of {{expr}}. Otherwise, the 
> function returns an error.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/elt.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_elt]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-26840) [minor refactor] Add a general TopNBufferCacheRemovalListener for topN function

2024-08-12 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-26840.
---
Resolution: Fixed

Fixed in master: 227e61d2e010ec8123dcda78675f18deadf7df1b

> [minor refactor] Add a general TopNBufferCacheRemovalListener for topN 
> function
> ---
>
> Key: FLINK-26840
> URL: https://issues.apache.org/jira/browse/FLINK-26840
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.15.0
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Currently the FastTop1Function and UpdatableTopNFunction created a private 
> CacheRemovalListener which can be general. So do a minor refactor that add a 
> new TopNBufferCacheRemovalListener instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-26840) [minor refactor] Add a general TopNBufferCacheRemovalListener for topN function

2024-08-12 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-26840:
---

Assignee: lincoln lee

> [minor refactor] Add a general TopNBufferCacheRemovalListener for topN 
> function
> ---
>
> Key: FLINK-26840
> URL: https://issues.apache.org/jira/browse/FLINK-26840
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.15.0
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Currently the FastTop1Function and UpdatableTopNFunction created a private 
> CacheRemovalListener which can be general. So do a minor refactor that add a 
> new TopNBufferCacheRemovalListener instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35987) Add ELT function

2024-08-12 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872952#comment-17872952
 ] 

lincoln lee commented on FLINK-35987:
-

[~dylanhz] I'm ok with current implementation of the pr which supports all 
string inputs and doesn't have compile-phase negative index check yet(it's a 
nice-to-have feature and can be done later).

> Add ELT function
> 
>
> Key: FLINK-35987
> URL: https://issues.apache.org/jira/browse/FLINK-35987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ELT function.
> 
> Returns the {{index}}-th expression.
> Example:
> {code:sql}
> > SELECT ELT(1, 'scala', 'java');
>  scala
> {code}
> Syntax:
> {code:sql}
> ELT(index, expr...)
> {code}
> Arguments:
>  * {{index}}: An INTEGER expression.
>  * {{expr}}: A STRING or BINARY expression. One expr is required as least.
> Returns:
> The result has the type of the least common type of all {{expr}}.
> {{index}} must be between 1 and the number of {{expr}}. Otherwise, the 
> function returns an error.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/elt.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_elt]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-11 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872773#comment-17872773
 ] 

lincoln lee commented on FLINK-27519:
-

Some updates: after debugged with code, the nested over agg query works fine in 
calcite.

Verified related case in
`RelOptRulesTest`:
{code}
@Test void testProjectToWindowRuleForNestedOver() {
HepProgramBuilder builder = new HepProgramBuilder();
builder.addRuleClass(ProjectToWindowRule.class);
HepPlanner hepPlanner = new HepPlanner(builder.build());
hepPlanner.addRule(CoreRules.PROJECT_TO_LOGICAL_PROJECT_AND_WINDOW);

final String sql = "select deptno, f1, f2 from (select *, last_value(deptno) 
over (order by empno) f2\n"
+ "from (select *, first_value(deptno) over (order by empno) f1 from emp))\n";
sql(sql).withPlanner(hepPlanner)
.check();
}
 
LogicalProject(DEPTNO=[$7], F1=[$9], $2=[$10])
  LogicalWindow(window#0=[window(order by [0] aggs [LAST_VALUE($7)])])
    LogicalWindow(window#0=[window(order by [0] aggs [FIRST_VALUE($7)])])
      LogicalTableScan(table=[[CATALOG, SALES, EMP]])
 
{code}
 

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Assignee: lincoln lee
>Priority: Major
> Fix For: 2.0.0
>
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-09 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35885.
---

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36000) DynamicTableSink#Context's getTargetColumns should not return an array of zero length

2024-08-09 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871820#comment-17871820
 ] 

lincoln lee edited comment on FLINK-36000 at 8/9/24 8:12 AM:
-

fixed in master: fc5f3b5290dfb0b39682f29d0fc8e851dda5dd31

1.20: 42cc2a04459bb4365e6a4b444ccd8e6b328a369f

1.19: 81de170ddc9311ef1bc74d10841530a6eb60e5bd


was (Author: lincoln.86xy):
fixed in master: fc5f3b5290dfb0b39682f29d0fc8e851dda5dd31

> DynamicTableSink#Context's getTargetColumns should not return an array of 
> zero length
> -
>
> Key: FLINK-36000
> URL: https://issues.apache.org/jira/browse/FLINK-36000
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> The interface [`Optional getTargetColumns()` 
> |https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java#L192]
> is expected to return an Optional#empty() for a sql statement without 
> specifying a column list, e.g., 'insert into t1 select ...',
> but the current implemantation will return an int[0] in such cases, this is 
> unexpected.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-09 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872236#comment-17872236
 ] 

lincoln lee commented on FLINK-35885:
-

Fixed in 1.20: 01c0a24c9152721a6fe976797e197cfa72cea97d

1.19: 08649fd6981655bcd131c76ef36f6dd074566dd0

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35974) PyFlink YARN per-job on Docker test failed because docker-compose command not found

2024-08-08 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871919#comment-17871919
 ] 

lincoln lee commented on FLINK-35974:
-

[~renqs] [~Weijie Guo] the 'docker-compose: command not found' error also 
existed in both 1.19 & 1.20 branches, are you planning to do the backport? (If 
not then I can help the work)

1.19

[https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61289&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=5164]

1.20

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61288&view=logs&j=bea52777-eaf8-5663-8482-18fbc3630e81&t=43ba8ce7-ebbf-57cd-9163-444305d74117&l=5412

 

 

 

> PyFlink YARN per-job on Docker test failed because docker-compose command not 
> found
> ---
>
> Key: FLINK-35974
> URL: https://issues.apache.org/jira/browse/FLINK-35974
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Weijie Guo
>Assignee: Qingsheng Ren
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> {code:java}
> Aug 02 09:20:55 
> ==
> Aug 02 09:20:55 Running 'PyFlink YARN per-job on Docker test'
> Aug 02 09:20:55 
> ==
> Aug 02 09:20:55 TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-55842252037
> Aug 02 09:20:56 Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT
> Aug 02 09:20:57 Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT
> Aug 02 09:20:57 Docker version 26.1.3, build b72abbb
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common_docker.sh: 
> line 24: docker-compose: command not found
> Aug 02 09:20:57 [FAIL] Test script contains errors.
> Aug 02 09:20:57 Checking of logs skipped.
> Aug 02 09:20:57 
> Aug 02 09:20:57 [FAIL] 'PyFlink YARN per-job on Docker test' failed after 0 
> minutes and 1 seconds! Test exited with exit code 1
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61174&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=9544



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface

2024-08-08 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871916#comment-17871916
 ] 

lincoln lee commented on FLINK-35887:
-

I just merged this fix because it makes sense to prevent NPE.

For [~dylanhz] 's question about the reasoning of extracting TypeInfo from an 
interface, we can continue the discussion here or open a new thread.

> Null Pointer Exception in TypeExtractor.isRecord when trying to provide type 
> info for interface
> ---
>
> Key: FLINK-35887
> URL: https://issues.apache.org/jira/browse/FLINK-35887
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.19.1
>Reporter: Jacob Jona Fahlenkamp
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> The following code
> {code:java}
> import org.apache.flink.api.common.typeinfo.TypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.types.PojoTestUtils;
> import org.junit.jupiter.api.Test;
> import java.lang.reflect.Type;
> import java.util.Map;
> public class DebugTest {
> @TypeInfo(FooFactory.class)
> public interface Foo{}
> public static class FooFactory extends TypeInfoFactory {
>@Override
>public TypeInformation createTypeInfo(Type type, Map TypeInformation> map) {
>   return Types.POJO(Foo.class, Map.of());
>}
> }
> @Test
> void test() {
>PojoTestUtils.assertSerializedAsPojo(Foo.class);
> }
> } {code}
> throws this exception:
> {code:java}
> java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" 
> because the return value of "java.lang.Class.getSuperclass()" is null
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125)
>at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359)
>  at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
>  at 
> org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface

2024-08-08 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35887.
---
Resolution: Fixed

fixed in master: 9457ae8c07b1a0dc036aa88735d36420009a7eb9

> Null Pointer Exception in TypeExtractor.isRecord when trying to provide type 
> info for interface
> ---
>
> Key: FLINK-35887
> URL: https://issues.apache.org/jira/browse/FLINK-35887
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.19.1
>Reporter: Jacob Jona Fahlenkamp
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> The following code
> {code:java}
> import org.apache.flink.api.common.typeinfo.TypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.types.PojoTestUtils;
> import org.junit.jupiter.api.Test;
> import java.lang.reflect.Type;
> import java.util.Map;
> public class DebugTest {
> @TypeInfo(FooFactory.class)
> public interface Foo{}
> public static class FooFactory extends TypeInfoFactory {
>@Override
>public TypeInformation createTypeInfo(Type type, Map TypeInformation> map) {
>   return Types.POJO(Foo.class, Map.of());
>}
> }
> @Test
> void test() {
>PojoTestUtils.assertSerializedAsPojo(Foo.class);
> }
> } {code}
> throws this exception:
> {code:java}
> java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" 
> because the return value of "java.lang.Class.getSuperclass()" is null
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125)
>at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359)
>  at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
>  at 
> org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface

2024-08-08 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35887:

Fix Version/s: 2.0.0

> Null Pointer Exception in TypeExtractor.isRecord when trying to provide type 
> info for interface
> ---
>
> Key: FLINK-35887
> URL: https://issues.apache.org/jira/browse/FLINK-35887
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.19.1
>Reporter: Jacob Jona Fahlenkamp
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> The following code
> {code:java}
> import org.apache.flink.api.common.typeinfo.TypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.types.PojoTestUtils;
> import org.junit.jupiter.api.Test;
> import java.lang.reflect.Type;
> import java.util.Map;
> public class DebugTest {
> @TypeInfo(FooFactory.class)
> public interface Foo{}
> public static class FooFactory extends TypeInfoFactory {
>@Override
>public TypeInformation createTypeInfo(Type type, Map TypeInformation> map) {
>   return Types.POJO(Foo.class, Map.of());
>}
> }
> @Test
> void test() {
>PojoTestUtils.assertSerializedAsPojo(Foo.class);
> }
> } {code}
> throws this exception:
> {code:java}
> java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" 
> because the return value of "java.lang.Class.getSuperclass()" is null
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125)
>at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359)
>  at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
>  at 
> org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35887) Null Pointer Exception in TypeExtractor.isRecord when trying to provide type info for interface

2024-08-08 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35887:
---

Assignee: Dylan He

> Null Pointer Exception in TypeExtractor.isRecord when trying to provide type 
> info for interface
> ---
>
> Key: FLINK-35887
> URL: https://issues.apache.org/jira/browse/FLINK-35887
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.19.1
>Reporter: Jacob Jona Fahlenkamp
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> The following code
> {code:java}
> import org.apache.flink.api.common.typeinfo.TypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.types.PojoTestUtils;
> import org.junit.jupiter.api.Test;
> import java.lang.reflect.Type;
> import java.util.Map;
> public class DebugTest {
> @TypeInfo(FooFactory.class)
> public interface Foo{}
> public static class FooFactory extends TypeInfoFactory {
>@Override
>public TypeInformation createTypeInfo(Type type, Map TypeInformation> map) {
>   return Types.POJO(Foo.class, Map.of());
>}
> }
> @Test
> void test() {
>PojoTestUtils.assertSerializedAsPojo(Foo.class);
> }
> } {code}
> throws this exception:
> {code:java}
> java.lang.NullPointerException: Cannot invoke "java.lang.Class.getName()" 
> because the return value of "java.lang.Class.getSuperclass()" is null
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isRecord(TypeExtractor.java:2227)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.(PojoSerializer.java:125)
>at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createPojoSerializer(PojoTypeInfo.java:359)
>  at 
> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:347)
>  at 
> org.apache.flink.types.PojoTestUtils.assertSerializedAsPojo(PojoTestUtils.java:48)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35987) Add ELT function

2024-08-08 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871888#comment-17871888
 ] 

lincoln lee commented on FLINK-35987:
-

[~dylanhz] You can take `ItemAtIndexArgumentTypeStrategy` as an example, 
validate those literal index values which was actually negative integer.

> Add ELT function
> 
>
> Key: FLINK-35987
> URL: https://issues.apache.org/jira/browse/FLINK-35987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ELT function.
> 
> Returns the {{index}}-th expression.
> Example:
> {code:sql}
> > SELECT ELT(1, 'scala', 'java');
>  scala
> {code}
> Syntax:
> {code:sql}
> ELT(index, expr...)
> {code}
> Arguments:
>  * {{index}}: An INTEGER expression.
>  * {{expr}}: A STRING or BINARY expression. One expr is required as least.
> Returns:
> The result has the type of the least common type of all {{expr}}.
> {{index}} must be between 1 and the number of {{expr}}. Otherwise, the 
> function returns an error.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/elt.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_elt]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35987) Add ELT function

2024-08-07 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871835#comment-17871835
 ] 

lincoln lee commented on FLINK-35987:
-

[~dylanhz] I propose some adjustments to the behavior/semantics of this new 
function:

First, define the `index` to be explicitly 'a positive integer starting from 1' 
(during compile phase, we can report errors for explicitly illegal integer 
values, such as negative numbers).

At runtime, when an invalid index value is encountered, the function doesn't 
throw any exceptions and returns null.

WDYT?

> Add ELT function
> 
>
> Key: FLINK-35987
> URL: https://issues.apache.org/jira/browse/FLINK-35987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ELT function.
> 
> Returns the {{index}}-th expression.
> Example:
> {code:sql}
> > SELECT ELT(1, 'scala', 'java');
>  scala
> {code}
> Syntax:
> {code:sql}
> ELT(index, expr...)
> {code}
> Arguments:
>  * {{index}}: An INTEGER expression.
>  * {{expr}}: A STRING or BINARY expression. One expr is required as least.
> Returns:
> The result has the type of the least common type of all {{expr}}.
> {{index}} must be between 1 and the number of {{expr}}. Otherwise, the 
> function returns an error.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/elt.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_elt]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35987) Add ELT function

2024-08-07 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35987:
---

Assignee: Dylan He

> Add ELT function
> 
>
> Key: FLINK-35987
> URL: https://issues.apache.org/jira/browse/FLINK-35987
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dylan He
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
>
> Add ELT function.
> 
> Returns the {{index}}-th expression.
> Example:
> {code:sql}
> > SELECT ELT(1, 'scala', 'java');
>  scala
> {code}
> Syntax:
> {code:sql}
> ELT(index, expr...)
> {code}
> Arguments:
>  * {{index}}: An INTEGER expression.
>  * {{expr}}: A STRING or BINARY expression. One expr is required as least.
> Returns:
> The result has the type of the least common type of all {{expr}}.
> {{index}} must be between 1 and the number of {{expr}}. Otherwise, the 
> function returns an error.
> See also:
>  * 
> [Spark|https://spark.apache.org/docs/3.5.1/sql-ref-functions-builtin.html#string-functions]
>  * 
> [Databricks|https://docs.databricks.com/en/sql/language-manual/functions/elt.html]
>  * 
> [MySQL|https://dev.mysql.com/doc/refman/8.4/en/string-functions.html#function_elt]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36000) DynamicTableSink#Context's getTargetColumns should not return an array of zero length

2024-08-07 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-36000:

Fix Version/s: 1.19.2
   1.20.1

> DynamicTableSink#Context's getTargetColumns should not return an array of 
> zero length
> -
>
> Key: FLINK-36000
> URL: https://issues.apache.org/jira/browse/FLINK-36000
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> The interface [`Optional getTargetColumns()` 
> |https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java#L192]
> is expected to return an Optional#empty() for a sql statement without 
> specifying a column list, e.g., 'insert into t1 select ...',
> but the current implemantation will return an int[0] in such cases, this is 
> unexpected.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-07 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35885:

Fix Version/s: 1.20.1
   1.19.2

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-07 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871704#comment-17871704
 ] 

lincoln lee commented on FLINK-35885:
-

[~xuyangzhong] Could you also do the backport to both 1.19 & 1.20 branch?

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-07 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee resolved FLINK-35885.
-
Resolution: Fixed

Fixed in master: b3b709f46ca7b12d9a26192a60cdde790d8523b9

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-07 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-35885:

Fix Version/s: 2.0.0

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36000) DynamicTableSink#Context's getTargetColumns should not return an array of zero length

2024-08-07 Thread lincoln lee (Jira)
lincoln lee created FLINK-36000:
---

 Summary: DynamicTableSink#Context's getTargetColumns should not 
return an array of zero length
 Key: FLINK-36000
 URL: https://issues.apache.org/jira/browse/FLINK-36000
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.1, 1.20.0
Reporter: lincoln lee
Assignee: lincoln lee
 Fix For: 2.0.0


The interface [`Optional getTargetColumns()` 
|https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java#L192]

is expected to return an Optional#empty() for a sql statement without 
specifying a column list, e.g., 'insert into t1 select ...',

but the current implemantation will return an int[0] in such cases, this is 
unexpected.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-06 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee resolved FLINK-27519.
-
Resolution: Fixed

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Assignee: lincoln lee
>Priority: Major
> Fix For: 2.0.0
>
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-06 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871303#comment-17871303
 ] 

lincoln lee commented on FLINK-27519:
-

[~paul8263] Current fix uses a unified renaming introduced by FLINK-22121.

But as you mentioned above, there's something wrong during the plan rewriting, 
and I spent some time to debug it, root cause is in calcite's 
`CalcRelSplitter`#execute, when split a project which contains over agg call 
into a overwindow and calc, the new overwindow node lost the original alias 
name, but semantically it doesn't mean it's wrong because the following new 
calc's output use the expected alias name. So if the nested over agg query 
works fine in calcite, we can keep the current renaming code. (encounters some 
unexpected building error in my local calcite project, will take some time to 
verify it later.)

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Assignee: lincoln lee
>Priority: Major
> Fix For: 2.0.0
>
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-06 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-27519:

Fix Version/s: 2.0.0

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Assignee: lincoln lee
>Priority: Major
> Fix For: 2.0.0
>
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-06 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871297#comment-17871297
 ] 

lincoln lee commented on FLINK-27519:
-

Fixed in master: 475f45ba0fb78f81adaa627ed2e8fbdcd71b83f6

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Assignee: lincoln lee
>Priority: Major
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-06 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-27519:
---

Assignee: lincoln lee

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Assignee: lincoln lee
>Priority: Major
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35976) StreamPhysicalOverAggregate should handle column name confliction

2024-08-06 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-35976.
---
Fix Version/s: (was: 2.0.0)
   Resolution: Duplicate

> StreamPhysicalOverAggregate should handle column name confliction
> -
>
> Key: FLINK-35976
> URL: https://issues.apache.org/jira/browse/FLINK-35976
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
>
> Duplicate column name exception occurred when use a nested over aggregate 
> query,
> e.g., a repro case:
> {code}
> @Test
> def testNestedOverAgg(): Unit = {
> util.addTable(s"""
> |CREATE TEMPORARY TABLE src (
> | a STRING,
> | b STRING,
> | ts TIMESTAMP_LTZ(3),
> | watermark FOR ts as ts
> |) WITH (
> | 'connector' = 'values'
> |)
> |""".stripMargin)
> util.verifyExecPlan(s"""
> |SELECT *
> |FROM (
> | SELECT
> | *, count(*) OVER (PARTITION BY a ORDER BY ts) AS c2
> | FROM (
> | SELECT
> | *, count(*) OVER (PARTITION BY a,b ORDER BY ts) AS c1
> | FROM src
> | )
> |)
> |""".stripMargin)
> }
> {code}
>  
> {code}
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]
>  
> at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
> at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
> at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
> at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
> at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:678)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel.translateToExecNode(FlinkPhysicalRel.scala:53)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel.translateToExecNode$(FlinkPhysicalRel.scala:52)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregateBase.translateToExecNode(StreamPhysicalOverAggregateBase.scala:35)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:54)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToExecNodeGraph(PlannerBase.scala:407)
> at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1076)
> at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:920)
> at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:675)
> at 
> org.apache.flink.table.planner.plan.stream.sql.agg.OverAggregateTest.testNestedOverAgg(OverAggregateTest.scala:460)
> {code}
>  
> This is a similar case In https://issues.apache.org/jira/browse/FLINK-22121, 
> but missed the fixing in streaming over agg scenario.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35974) PyFlink YARN per-job on Docker test failed because docker-compose command not found

2024-08-06 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871295#comment-17871295
 ] 

lincoln lee commented on FLINK-35974:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61223&view=logs&j=81be5d54-0dc6-5130-d390-233dd2956037&t=cfb9de70-be4e-5162-887e-653276e3edee

> PyFlink YARN per-job on Docker test failed because docker-compose command not 
> found
> ---
>
> Key: FLINK-35974
> URL: https://issues.apache.org/jira/browse/FLINK-35974
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Weijie Guo
>Priority: Blocker
>  Labels: pull-request-available
>
> {code:java}
> Aug 02 09:20:55 
> ==
> Aug 02 09:20:55 Running 'PyFlink YARN per-job on Docker test'
> Aug 02 09:20:55 
> ==
> Aug 02 09:20:55 TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-55842252037
> Aug 02 09:20:56 Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT
> Aug 02 09:20:57 Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT
> Aug 02 09:20:57 Docker version 26.1.3, build b72abbb
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common_docker.sh: 
> line 24: docker-compose: command not found
> Aug 02 09:20:57 [FAIL] Test script contains errors.
> Aug 02 09:20:57 Checking of logs skipped.
> Aug 02 09:20:57 
> Aug 02 09:20:57 [FAIL] 'PyFlink YARN per-job on Docker test' failed after 0 
> minutes and 1 seconds! Test exited with exit code 1
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61174&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=9544



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35974) PyFlink YARN per-job on Docker test failed because docker-compose command not found

2024-08-06 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17871294#comment-17871294
 ] 

lincoln lee commented on FLINK-35974:
-

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61223&view=logs&j=ef799394-2d67-5ff4-b2e5-410b80c9c0af&t=9e5768bc-daae-5f5f-1861-e58617922c7a&l=10571

> PyFlink YARN per-job on Docker test failed because docker-compose command not 
> found
> ---
>
> Key: FLINK-35974
> URL: https://issues.apache.org/jira/browse/FLINK-35974
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Weijie Guo
>Priority: Blocker
>  Labels: pull-request-available
>
> {code:java}
> Aug 02 09:20:55 
> ==
> Aug 02 09:20:55 Running 'PyFlink YARN per-job on Docker test'
> Aug 02 09:20:55 
> ==
> Aug 02 09:20:55 TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-55842252037
> Aug 02 09:20:56 Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT
> Aug 02 09:20:57 Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-2.0-SNAPSHOT-bin/flink-2.0-SNAPSHOT
> Aug 02 09:20:57 Docker version 26.1.3, build b72abbb
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common_docker.sh: 
> line 24: docker-compose: command not found
> Aug 02 09:20:57 [FAIL] Test script contains errors.
> Aug 02 09:20:57 Checking of logs skipped.
> Aug 02 09:20:57 
> Aug 02 09:20:57 [FAIL] 'PyFlink YARN per-job on Docker test' failed after 0 
> minutes and 1 seconds! Test exited with exit code 1
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=61174&view=logs&j=af184cdd-c6d8-5084-0b69-7e9c67b35f7a&t=0f3adb59-eefa-51c6-2858-3654d9e0749d&l=9544



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27519) Fix duplicates names when there are multiple levels of over window aggregate

2024-08-05 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870999#comment-17870999
 ] 

lincoln lee commented on FLINK-27519:
-

Thanks [~hackergin] for the tips and the discussion here! Looks like this issue 
has been on hold for a while.
I recently encountered this issue in an internal version and had a quick fix 
after some investigating (since I didn't search for the jira directly, but it 
looks like everyone is basically on the same page). So [~hackergin] [~paul8263] 
could some of you help review the pr 
[#25152|https://github.com/apache/flink/pull/25152]? 

> Fix duplicates names when there are multiple levels of over window aggregate
> 
>
> Key: FLINK-27519
> URL: https://issues.apache.org/jira/browse/FLINK-27519
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.15.0
>Reporter: Feng Jin
>Priority: Major
>
> A similar  issue like 
> [FLINK-22121|https://issues.apache.org/jira/browse/FLINK-22121]
> And can be reproduced by adding this unit test 
> org.apache.flink.table.planner.plan.stream.sql.agg.GroupWindowTest#testWindowAggregateWithAnotherWindowAggregate
> {code:java}
> //代码占位符
> @Test
> def testWindowAggregateWithAnotherWindowAggregate(): Unit = {
>   val sql =
> """
>   |SELECT CAST(pv AS INT) AS pv, CAST(uv AS INT) AS uv FROM (
>   |  SELECT *, count(distinct(c)) over (partition by a order by b desc) 
> AS uv
>   |  FROM (
>   |SELECT *, count(*) over (partition by a, c order by b desc) AS pv
>   |FROM MyTable
>   |  )
>   |)
>   |""".stripMargin
>   util.verifyExecPlan(sql)
> } {code}
> The error message : 
>  
>  
> {code:java}
> //代码占位符
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]    at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
>     at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
>     at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
>     at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:663)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
>     at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:71)
>  {code}
>  
> I think we can add come logical in  FlinkLogicalOverAggregate  to avoid 
> duplicate names of  output rowType. 
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35976) StreamPhysicalOverAggregate should handle column name confliction

2024-08-05 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35976?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870989#comment-17870989
 ] 

lincoln lee commented on FLINK-35976:
-

Thanks [~hackergin] for your tips! Let's move the disscussion there.

> StreamPhysicalOverAggregate should handle column name confliction
> -
>
> Key: FLINK-35976
> URL: https://issues.apache.org/jira/browse/FLINK-35976
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Duplicate column name exception occurred when use a nested over aggregate 
> query,
> e.g., a repro case:
> {code}
> @Test
> def testNestedOverAgg(): Unit = {
> util.addTable(s"""
> |CREATE TEMPORARY TABLE src (
> | a STRING,
> | b STRING,
> | ts TIMESTAMP_LTZ(3),
> | watermark FOR ts as ts
> |) WITH (
> | 'connector' = 'values'
> |)
> |""".stripMargin)
> util.verifyExecPlan(s"""
> |SELECT *
> |FROM (
> | SELECT
> | *, count(*) OVER (PARTITION BY a ORDER BY ts) AS c2
> | FROM (
> | SELECT
> | *, count(*) OVER (PARTITION BY a,b ORDER BY ts) AS c1
> | FROM src
> | )
> |)
> |""".stripMargin)
> }
> {code}
>  
> {code}
> org.apache.flink.table.api.ValidationException: Field names must be unique. 
> Found duplicates: [w0$o0]
>  
> at 
> org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
> at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
> at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
> at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
> at 
> org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:678)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel.translateToExecNode(FlinkPhysicalRel.scala:53)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel.translateToExecNode$(FlinkPhysicalRel.scala:52)
> at 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregateBase.translateToExecNode(StreamPhysicalOverAggregateBase.scala:35)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
> at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:54)
> at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToExecNodeGraph(PlannerBase.scala:407)
> at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1076)
> at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:920)
> at 
> org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:675)
> at 
> org.apache.flink.table.planner.plan.stream.sql.agg.OverAggregateTest.testNestedOverAgg(OverAggregateTest.scala:460)
> {code}
>  
> This is a similar case In https://issues.apache.org/jira/browse/FLINK-22121, 
> but missed the fixing in streaming over agg scenario.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35976) StreamPhysicalOverAggregate should handle column name confliction

2024-08-05 Thread lincoln lee (Jira)
lincoln lee created FLINK-35976:
---

 Summary: StreamPhysicalOverAggregate should handle column name 
confliction
 Key: FLINK-35976
 URL: https://issues.apache.org/jira/browse/FLINK-35976
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.1, 1.20.0
Reporter: lincoln lee
Assignee: lincoln lee
 Fix For: 2.0.0


Duplicate column name exception occurred when use a nested over aggregate query,

e.g., a repro case:

{code}
@Test
def testNestedOverAgg(): Unit = {
util.addTable(s"""
|CREATE TEMPORARY TABLE src (
| a STRING,
| b STRING,
| ts TIMESTAMP_LTZ(3),
| watermark FOR ts as ts
|) WITH (
| 'connector' = 'values'
|)
|""".stripMargin)

util.verifyExecPlan(s"""
|SELECT *
|FROM (
| SELECT
| *, count(*) OVER (PARTITION BY a ORDER BY ts) AS c2
| FROM (
| SELECT
| *, count(*) OVER (PARTITION BY a,b ORDER BY ts) AS c1
| FROM src
| )
|)
|""".stripMargin)
}
{code}

 

{code}
org.apache.flink.table.api.ValidationException: Field names must be unique. 
Found duplicates: [w0$o0]
 
at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:273)
at org.apache.flink.table.types.logical.RowType.(RowType.java:158)
at org.apache.flink.table.types.logical.RowType.of(RowType.java:298)
at org.apache.flink.table.types.logical.RowType.of(RowType.java:290)
at 
org.apache.flink.table.planner.calcite.FlinkTypeFactory$.toLogicalRowType(FlinkTypeFactory.scala:678)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregate.translateToExecNode(StreamPhysicalOverAggregate.scala:57)
at 
org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel.translateToExecNode(FlinkPhysicalRel.scala:53)
at 
org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel.translateToExecNode$(FlinkPhysicalRel.scala:52)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalOverAggregateBase.translateToExecNode(StreamPhysicalOverAggregateBase.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:74)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraphGenerator.generate(ExecNodeGraphGenerator.java:54)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToExecNodeGraph(PlannerBase.scala:407)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.assertPlanEquals(TableTestBase.scala:1076)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.doVerifyPlan(TableTestBase.scala:920)
at 
org.apache.flink.table.planner.utils.TableTestUtilBase.verifyExecPlan(TableTestBase.scala:675)
at 
org.apache.flink.table.planner.plan.stream.sql.agg.OverAggregateTest.testNestedOverAgg(OverAggregateTest.scala:460)
{code}

 

This is a similar case In https://issues.apache.org/jira/browse/FLINK-22121, 
but missed the fixing in streaming over agg scenario.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-26939) Add TRANSLATE supported in SQL & Table API

2024-08-03 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-26939.
---
Fix Version/s: 2.0.0
 Assignee: Dylan He
   Resolution: Fixed

Fixed in master: 41b68c298a6f99b57de04e27e64567f914aab638

> Add TRANSLATE supported in SQL & Table API
> --
>
> Key: FLINK-26939
> URL: https://issues.apache.org/jira/browse/FLINK-26939
> Project: Flink
>  Issue Type: Sub-task
>Reporter: dalongliu
>Assignee: Dylan He
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 2.0.0
>
> Attachments: image-2022-05-11-09-51-21-400.png
>
>
> Syntax:
> translate(expr, from, to)
>  
> Arguments:
>  * {{{}expr{}}}: A STRING expression.
>  * {{{}from{}}}: A STRING expression consisting of a set of characters to be 
> replaced.
>  * {{{}to{}}}: A STRING expression consisting of a matching set of characters 
> to replace {{{}from{}}}.
> Return: 
> A STRING
>  
> Examples:
> {code:java}
> > SELECT translate('AaBbCc', 'abc', '123');
>  A1B2C3
> > SELECT translate('AaBbCc', 'abc', '1');
>  A1BC
> > SELECT translate('AaBbCc', 'abc', '');
>  ABC {code}
> See more
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]
>  * 
> [Databricks|https://docs.databricks.com/sql/language-manual/functions/translate.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-26939) Add TRANSLATE supported in SQL & Table API

2024-08-03 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee updated FLINK-26939:

Priority: Major  (was: Critical)

> Add TRANSLATE supported in SQL & Table API
> --
>
> Key: FLINK-26939
> URL: https://issues.apache.org/jira/browse/FLINK-26939
> Project: Flink
>  Issue Type: Sub-task
>Reporter: dalongliu
>Assignee: Dylan He
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
> Attachments: image-2022-05-11-09-51-21-400.png
>
>
> Syntax:
> translate(expr, from, to)
>  
> Arguments:
>  * {{{}expr{}}}: A STRING expression.
>  * {{{}from{}}}: A STRING expression consisting of a set of characters to be 
> replaced.
>  * {{{}to{}}}: A STRING expression consisting of a matching set of characters 
> to replace {{{}from{}}}.
> Return: 
> A STRING
>  
> Examples:
> {code:java}
> > SELECT translate('AaBbCc', 'abc', '123');
>  A1B2C3
> > SELECT translate('AaBbCc', 'abc', '1');
>  A1BC
> > SELECT translate('AaBbCc', 'abc', '');
>  ABC {code}
> See more
>  * [Hive|https://cwiki.apache.org/confluence/display/hive/languagemanual+udf]
>  * 
> [Databricks|https://docs.databricks.com/sql/language-manual/functions/translate.html]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-35885) proctime aggregate window triggered by watermark

2024-08-01 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-35885:
---

Assignee: xuyang

> proctime aggregate window triggered by watermark
> 
>
> Key: FLINK-35885
> URL: https://issues.apache.org/jira/browse/FLINK-35885
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.13.6, 1.17.2
> Environment: flink 1.13.6 with blink or flink 1.17.2
>Reporter: Baozhu Zhao
>Assignee: xuyang
>Priority: Major
>  Labels: pull-request-available
>
> We have discovered an unexpected case where abnormal data with a count of 0 
> occurs when performing proctime window aggregation on data with a watermark.
> The SQL is as follows
> {code:sql}
> CREATE TABLE s1 (
> id INT,
> event_time TIMESTAMP(3),
> name string,
> proc_time AS PROCTIME (),
> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND
> )
> WITH
> ('connector' = 'my-source')
> ;
> SELECT
> *
> FROM
> (
> SELECT
> name,
> COUNT(id) AS total_count,
> window_start,
> window_end
> FROM
> TABLE (
> TUMBLE (
> TABLE s1,
> DESCRIPTOR (proc_time),
> INTERVAL '30' SECONDS
> )
> )
> GROUP BY
> window_start,
> window_end,
> name
> )
> WHERE
> total_count = 0;
> {code}
> For detailed test code, please refer to 
> [https://github.com/xingsuo-zbz/flink/blob/zbz/117/proc-agg-window-process-watermark-bug-test/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/bug/WindowBugTest.java]
> 
> The root cause is that 
> https://github.com/apache/flink/blob/release-1.17.2/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SlicingWindowOperator.java#L229
>  supports advance progress by watermark. When the watermark suddenly exceeds 
> the next window end timestamp, a result of count 0 will appear.
> {code:java}
>   public void processWatermark(Watermark mark) throws Exception {
> if (mark.getTimestamp() > currentWatermark) {
> windowProcessor.advanceProgress(mark.getTimestamp());
> super.processWatermark(mark);
> } else {
> super.processWatermark(new Watermark(currentWatermark));
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >