[jira] [Commented] (FLINK-27212) Failed to CAST('abcde', VARBINARY)

2022-04-19 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17524693#comment-17524693
 ] 

Wenlong Lyu commented on FLINK-27212:
-

+1 for printing binary in hexadecimal, thanks for the quick update!

> Failed to CAST('abcde', VARBINARY)
> --
>
> Key: FLINK-27212
> URL: https://issues.apache.org/jira/browse/FLINK-27212
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Shengkai Fang
>Assignee: Marios Trivyzas
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Please add test in the CalcITCase
> {code:scala}
> @Test
>   def testCalc(): Unit = {
> val sql =
>   """
> |SELECT CAST('abcde' AS VARBINARY(6))
> |""".stripMargin
> val result = tEnv.executeSql(sql)
> print(result.getResolvedSchema)
> result.print()
>   }
> {code}
> The exception is 
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Odd number of 
> characters.
>   at 
> org.apache.flink.table.utils.EncodingUtils.decodeHex(EncodingUtils.java:203)
>   at StreamExecCalc$33.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-27212) Failed to CAST('abcde', VARBINARY)

2022-04-14 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17522618#comment-17522618
 ] 

Wenlong Lyu commented on FLINK-27212:
-

[~matriv] I think you may have some misunderstanding here. Regarding x'XXX', it 
means a hexdecimal 
literal(https://dev.mysql.com/doc/refman/8.0/en/hexadecimal-literals.html), it 
requires even number of values, the error in calcite means that the literal is 
illegal. it is irrelevant to the casting behavior I think.

BTW, we may need a FLIP for such kind of change to collect more feedbacks from 
devs and users. I think it is better to keep it the same as former versions, 
and make the decision later.

> Failed to CAST('abcde', VARBINARY)
> --
>
> Key: FLINK-27212
> URL: https://issues.apache.org/jira/browse/FLINK-27212
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.16.0
>Reporter: Shengkai Fang
>Assignee: Marios Trivyzas
>Priority: Blocker
> Fix For: 1.16.0
>
>
> Please add test in the CalcITCase
> {code:scala}
> @Test
>   def testCalc(): Unit = {
> val sql =
>   """
> |SELECT CAST('abcde' AS VARBINARY(6))
> |""".stripMargin
> val result = tEnv.executeSql(sql)
> print(result.getResolvedSchema)
> result.print()
>   }
> {code}
> The exception is 
> {code:java}
> Caused by: org.apache.flink.table.api.TableException: Odd number of 
> characters.
>   at 
> org.apache.flink.table.utils.EncodingUtils.decodeHex(EncodingUtils.java:203)
>   at StreamExecCalc$33.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
>   at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
>   at 
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-26770) Nullable ArrayData should not be Object[]

2022-03-21 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-26770:

Description: 
sql:
 
"INSERT INTO %s "
+ " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
+ "1,'dim',cast(20.2007 as 
double),false,652482,cast('2020-07-08' as date),'source_test',cast('2020-07-10 
16:28:07.737' as timestamp),"
+ "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
+ "ARRAY[cast(8.58967 as float),cast(96.4667 as 
float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
double),cast(792343.646446 as double),cast(76.46464 as double)],"
+ "cast(ARRAY [true,true,false,true] as 
array),cast(ARRAY ['monday','saturday','sunday'] as 
array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
16:28:07.737' as timestamp)"
+ ")";

error:

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.Integer;
at 
org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
 ~[flink-table-common]


related codegen result:

  objArray$81 = new Object[result$76.size()];
  for ( i$82 = 0; i$82 < result$76.size(); i$82++) {
  if (!result$76.isNullAt(i$82)) {
  objArray$81[i$82] = result$76.getBoolean(i$82);

cause:
  ArrayToArrayCastRule#arrayElementType use Object when a column is 
nullable, but GenericArrayData only accepts array with specific
 types, like Integer[], I think we should follow 
CodeGenUtils#boxedTypeTermForType 

[~slinkydeveloper]


  was:
sql:
 
"INSERT INTO %s "
+ " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
+ "1,'dim',cast(20.2007 as 
double),false,652482,cast('2020-07-08' as date),'source_test',cast('2020-07-10 
16:28:07.737' as timestamp),"
+ "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
+ "ARRAY[cast(8.58967 as float),cast(96.4667 as 
float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
double),cast(792343.646446 as double),cast(76.46464 as double)],"
+ "cast(ARRAY [true,true,false,true] as 
array),cast(ARRAY ['monday','saturday','sunday'] as 
array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
16:28:07.737' as timestamp)"
+ ")";

error:

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.Integer;
at 
org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
 ~[flink-table-common-1.15-vvr-6.0-SNAPSHOT.jar:1.15-vvr-6.0-SNAPSHOT]


related codegen result:

  objArray$81 = new Object[result$76.size()];
  for ( i$82 = 0; i$82 < result$76.size(); i$82++) {
  if (!result$76.isNullAt(i$82)) {
  objArray$81[i$82] = result$76.getBoolean(i$82);

cause:
  ArrayToArrayCastRule#arrayElementType use Object when a column is 
nullable, but GenericArrayData only accepts array with specific
 types, like Integer[], I think we should follow 
CodeGenUtils#boxedTypeTermForType 

[~slinkydeveloper]



> Nullable ArrayData should not be Object[]
> -
>
> Key: FLINK-26770
> URL: https://issues.apache.org/jira/browse/FLINK-26770
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.15.0
>Reporter: Wenlong Lyu
>Priority: Major
>
> sql:
>  
> "INSERT INTO %s "
> + " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
> + "1,'dim',cast(20.2007 as 
> double),false,652482,cast('2020-07-08' as 
> date),'source_test',cast('2020-07-10 16:28:07.737' as timestamp),"
> + "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
> array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
> + "ARRAY[cast(8.58967 as float),cast(96.4667 as 
> float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
> double),cast(792343.646446 as double),cast(76.46464 as double)],"
> + "cast(ARRAY [true,true,false,true] as 
> array),cast(ARRAY ['monday','saturday','sunday'] as 
> array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
> 16:28:07.737' as timestamp)"
> + ")";
> error:
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
> to [Ljava.lang.Integer;
>   at 
> org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
>  ~[flink-table-common]
>   
> related codegen result:
>   objArray$81 = new 

[jira] [Updated] (FLINK-26770) Nullable ArrayData should not be Object[]

2022-03-21 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-26770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-26770:

Description: 
sql:
 
"INSERT INTO %s "
+ " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
+ "1,'dim',cast(20.2007 as 
double),false,652482,cast('2020-07-08' as date),'source_test',cast('2020-07-10 
16:28:07.737' as timestamp),"
+ "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
+ "ARRAY[cast(8.58967 as float),cast(96.4667 as 
float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
double),cast(792343.646446 as double),cast(76.46464 as double)],"
+ "cast(ARRAY [true,true,false,true] as 
array),cast(ARRAY ['monday','saturday','sunday'] as 
array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
16:28:07.737' as timestamp)"
+ ")";

error:

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.Integer;
at 
org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
 ~[flink-table-common-1.15-vvr-6.0-SNAPSHOT.jar:1.15-vvr-6.0-SNAPSHOT]


related codegen result:

  objArray$81 = new Object[result$76.size()];
  for ( i$82 = 0; i$82 < result$76.size(); i$82++) {
  if (!result$76.isNullAt(i$82)) {
  objArray$81[i$82] = result$76.getBoolean(i$82);

cause:
  ArrayToArrayCastRule#arrayElementType use Object when a column is 
nullable, but GenericArrayData only accepts array with specific
 types, like Integer[], I think we should follow 
CodeGenUtils#boxedTypeTermForType 

[~slinkydeveloper]


  was:

sql:
 
"INSERT INTO %s "
+ " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
+ "1,'dim',cast(20.2007 as 
double),false,652482,cast('2020-07-08' as date),'source_test',cast('2020-07-10 
16:28:07.737' as timestamp),"
+ "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
+ "ARRAY[cast(8.58967 as float),cast(96.4667 as 
float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
double),cast(792343.646446 as double),cast(76.46464 as double)],"
+ "cast(ARRAY [true,true,false,true] as 
array),cast(ARRAY ['monday','saturday','sunday'] as 
array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
16:28:07.737' as timestamp)"
+ ")";

error:

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.Integer;
at 
org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
 ~[flink-table-common-1.15-vvr-6.0-SNAPSHOT.jar:1.15-vvr-6.0-SNAPSHOT]


related codegen result:

  objArray$81 = new Object[result$76.size()];
  for ( i$82 = 0; i$82 < result$76.size(); i$82++) {
  if (!result$76.isNullAt(i$82)) {
  objArray$81[i$82] = result$76.getBoolean(i$82);

cause:
  ArrayToArrayCastRule#arrayElementType use Object when a column is 
nullable, but GenericArrayData only accepts array with specified types, like 
Integer[], I think we should follow CodeGenUtils#boxedTypeTermForType 

[~slinkydeveloper]



> Nullable ArrayData should not be Object[]
> -
>
> Key: FLINK-26770
> URL: https://issues.apache.org/jira/browse/FLINK-26770
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.15.0
>Reporter: Wenlong Lyu
>Priority: Major
>
> sql:
>  
> "INSERT INTO %s "
> + " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
> + "1,'dim',cast(20.2007 as 
> double),false,652482,cast('2020-07-08' as 
> date),'source_test',cast('2020-07-10 16:28:07.737' as timestamp),"
> + "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
> array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
> + "ARRAY[cast(8.58967 as float),cast(96.4667 as 
> float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
> double),cast(792343.646446 as double),cast(76.46464 as double)],"
> + "cast(ARRAY [true,true,false,true] as 
> array),cast(ARRAY ['monday','saturday','sunday'] as 
> array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
> 16:28:07.737' as timestamp)"
> + ")";
> error:
> Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast 
> to [Ljava.lang.Integer;
>   at 
> org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
>  ~[flink-table-common-1.15-vvr-6.0-SNAPSHOT.jar:1.15-vvr-6.0-SNAPSHOT]
>   
> related codegen 

[jira] [Created] (FLINK-26770) Nullable ArrayData should not be Object[]

2022-03-21 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-26770:
---

 Summary: Nullable ArrayData should not be Object[]
 Key: FLINK-26770
 URL: https://issues.apache.org/jira/browse/FLINK-26770
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Wenlong Lyu



sql:
 
"INSERT INTO %s "
+ " (a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r) values ("
+ "1,'dim',cast(20.2007 as 
double),false,652482,cast('2020-07-08' as date),'source_test',cast('2020-07-10 
16:28:07.737' as timestamp),"
+ "cast(8.58965 as float),cast(ARRAY [464,98661,32489] as 
array),cast(ARRAY [8589934592,8589934593,8589934594] as array),"
+ "ARRAY[cast(8.58967 as float),cast(96.4667 as 
float),cast(9345.16 as float)], ARRAY [cast(587897.4646746 as 
double),cast(792343.646446 as double),cast(76.46464 as double)],"
+ "cast(ARRAY [true,true,false,true] as 
array),cast(ARRAY ['monday','saturday','sunday'] as 
array),true,cast(8119.21 as numeric(6,2)), cast('2020-07-10 
16:28:07.737' as timestamp)"
+ ")";

error:

Caused by: java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to 
[Ljava.lang.Integer;
at 
org.apache.flink.table.data.GenericArrayData.toIntArray(GenericArrayData.java:297)
 ~[flink-table-common-1.15-vvr-6.0-SNAPSHOT.jar:1.15-vvr-6.0-SNAPSHOT]


related codegen result:

  objArray$81 = new Object[result$76.size()];
  for ( i$82 = 0; i$82 < result$76.size(); i$82++) {
  if (!result$76.isNullAt(i$82)) {
  objArray$81[i$82] = result$76.getBoolean(i$82);

cause:
  ArrayToArrayCastRule#arrayElementType use Object when a column is 
nullable, but GenericArrayData only accepts array with specified types, like 
Integer[], I think we should follow CodeGenUtils#boxedTypeTermForType 

[~slinkydeveloper]




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (FLINK-25695) TemporalJoin cause state leak in some cases

2022-02-28 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17499317#comment-17499317
 ] 

Wenlong Lyu edited comment on FLINK-25695 at 3/1/22, 4:12 AM:
--

hi, [~zicat] thanks for sharing your investigation. Are you using heap state in 
your production? it seems that this is a bug for heap state backend?


was (Author: wenlong.lwl):
hi, [~wenlong.lwl] thanks for sharing your investigation. Are you using heap 
state in your production? it seems that this is a bug for heap state backend?

> TemporalJoin cause state leak in some cases
> ---
>
> Key: FLINK-25695
> URL: https://issues.apache.org/jira/browse/FLINK-25695
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Lyn Zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-29-11-11-11-705.png, 
> image-2022-01-29-11-11-27-550.png, image-2022-01-29-11-11-31-967.png, 
> image-2022-01-29-11-12-43-016.png, test.sql
>
>
> Last year, I reported the similar bug of TemporalJoin cause state leak. 
> Detail: FLINK-21833
> Recently, I found the fix code can reduce the the leak size but can not 
> resolve it completely.
> The code of line 213 cause it and the right fix is to invoke cleanUp() method.
> In FLINK-21833, we discussed when the code is running on line 213, that means 
> Left State, Right State, registeredTimerState is empty, actually the Left 
> State and Right State value(MapState) is empty but the key is still be in 
> state, So invoke state.clear() is necessary. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25695) TemporalJoin cause state leak in some cases

2022-02-28 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17499317#comment-17499317
 ] 

Wenlong Lyu commented on FLINK-25695:
-

hi, [~wenlong.lwl] thanks for sharing your investigation. Are you using heap 
state in your production? it seems that this is a bug for heap state backend?

> TemporalJoin cause state leak in some cases
> ---
>
> Key: FLINK-25695
> URL: https://issues.apache.org/jira/browse/FLINK-25695
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Lyn Zhang
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2022-01-29-11-11-11-705.png, 
> image-2022-01-29-11-11-27-550.png, image-2022-01-29-11-11-31-967.png, 
> image-2022-01-29-11-12-43-016.png, test.sql
>
>
> Last year, I reported the similar bug of TemporalJoin cause state leak. 
> Detail: FLINK-21833
> Recently, I found the fix code can reduce the the leak size but can not 
> resolve it completely.
> The code of line 213 cause it and the right fix is to invoke cleanUp() method.
> In FLINK-21833, we discussed when the code is running on line 213, that means 
> Left State, Right State, registeredTimerState is empty, actually the Left 
> State and Right State value(MapState) is empty but the key is still be in 
> state, So invoke state.clear() is necessary. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25932) Introduce ExecNodeContext.generateUid()

2022-02-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495479#comment-17495479
 ] 

Wenlong Lyu commented on FLINK-25932:
-

[~slinkydeveloper] when the sql is not changed, most of the sql can be 
recompiled with the plan not changed across releases, and users can reload 
state from savepoints when upgrading. for example, a simple insert: `insert 
into result  select sum(price) from history group by user_id`
but now, all of the state is not compatible because all operators are assigned 
with a uid.

> Introduce ExecNodeContext.generateUid()
> ---
>
> Key: FLINK-25932
> URL: https://issues.apache.org/jira/browse/FLINK-25932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> FLINK-25387 introduced an {{ExecNodeContext}} which contains all information 
> to generate unique and deterministic identifiers for all created 
> {{Transformation}}.
> This issue includes:
> - Add {{ExecNodeContext.generateUid(operatorName: String): String}}
> - Go through all ExecNodes and give transformations a uid. The name can be 
> constant static field within the ExecNode such that both annotation and 
> method can use it.
> - We give all transformations a uid, including stateless ones.
> - The final UID should look like: 
> {{13_stream-exec-sink_1_upsert-materializer}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25932) Introduce ExecNodeContext.generateUid()

2022-02-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17495392#comment-17495392
 ] 

Wenlong Lyu commented on FLINK-25932:
-

hi, [~twalthr] [~slinkydeveloper] we may need to have an option to fall back 
not to set uids by default. All of the jobs would be state incompatible when 
users upgrade to 1.15 with the new uids.

> Introduce ExecNodeContext.generateUid()
> ---
>
> Key: FLINK-25932
> URL: https://issues.apache.org/jira/browse/FLINK-25932
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Assignee: Francesco Guardiani
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> FLINK-25387 introduced an {{ExecNodeContext}} which contains all information 
> to generate unique and deterministic identifiers for all created 
> {{Transformation}}.
> This issue includes:
> - Add {{ExecNodeContext.generateUid(operatorName: String): String}}
> - Go through all ExecNodes and give transformations a uid. The name can be 
> constant static field within the ExecNode such that both annotation and 
> method can use it.
> - We give all transformations a uid, including stateless ones.
> - The final UID should look like: 
> {{13_stream-exec-sink_1_upsert-materializer}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25600) Support new statement set syntax in sql client and update docs

2022-02-13 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17491746#comment-17491746
 ] 

Wenlong Lyu commented on FLINK-25600:
-

[~slinkydeveloper] I am sorry that the title format of pr is wrong, I have 
fixed it.

> Support new statement set syntax in sql client and update docs
> --
>
> Key: FLINK-25600
> URL: https://issues.apache.org/jira/browse/FLINK-25600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
>
> this is a follow up of FLINK-25392, to finish adding the new statement set: 
> 1. the new statement set need multi line parsing support in sql client, which 
> is not supported currently:
> execute statement set begin
> insert xxx;
> insert xxx;
> end;
> 2. we need to update the doc to introduce the new syntax



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25600) Support new statement set syntax in sql client and update docs

2022-02-10 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17490661#comment-17490661
 ] 

Wenlong Lyu commented on FLINK-25600:
-

[~slinkydeveloper] the pr is waiting to be reviewed. I have asked [~godfreyhe] 
to help review it. It would great if you guys have time to help the reviewing 
too.

> Support new statement set syntax in sql client and update docs
> --
>
> Key: FLINK-25600
> URL: https://issues.apache.org/jira/browse/FLINK-25600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>
> this is a follow up of FLINK-25392, to finish adding the new statement set: 
> 1. the new statement set need multi line parsing support in sql client, which 
> is not supported currently:
> execute statement set begin
> insert xxx;
> insert xxx;
> end;
> 2. we need to update the doc to introduce the new syntax



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25730) Fix chaining strategy (and typo?) in RowTimeMiniBatchAssginerOperator

2022-01-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479782#comment-17479782
 ] 

Wenlong Lyu commented on FLINK-25730:
-

thanks for the reviewing. Yes, we should correct the typo, IMO

> Fix chaining strategy (and typo?) in RowTimeMiniBatchAssginerOperator
> -
>
> Key: FLINK-25730
> URL: https://issues.apache.org/jira/browse/FLINK-25730
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.2
>Reporter: Q Kang
>Priority: Minor
>
> The `RowTimeMiniBatchAssginerOperator` should have its chaining strategy set 
> to `ALWAYS`, like previously fixed `LocalSlicingWindowAggOperator` in 
> [FLINK-25614|https://issues.apache.org/jira/browse/FLINK-25614]. 
> BTW, there's a long-standing typo in its class name (`Assginer` -> 
> `Assigner`), can we combine the correction into this issue?
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25700) flinksql Cascading Window TVF Aggregation exception

2022-01-19 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17479110#comment-17479110
 ] 

Wenlong Lyu commented on FLINK-25700:
-

[~simen] it is not a bug actually, you should rename window_start, window_end 
in window_view2  to another names , to avoid duplicated columns in 
TABLE(TUMBLE(TABLE window_view2, DESCRIPTOR(rowtime), INTERVAL '10' SECOND))

> flinksql Cascading Window TVF Aggregation  exception
> 
>
> Key: FLINK-25700
> URL: https://issues.apache.org/jira/browse/FLINK-25700
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: simenliuxing
>Priority: Major
>
> flink version 1.14.0
> sql
>  
> {code:java}
> CREATE TABLE order_test
> (
> order_price int,
> order_item varchar,
> ts as localtimestamp,
> WATERMARK FOR ts as ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'datagen'
> );
> create view window_view2 as
> select window_start, window_end, window_time as rowtime, sum(order_price) 
> partial_price
> from TABLE(
> TUMBLE(
> TABLE order_test, DESCRIPTOR(ts), INTERVAL '2' SECOND)
> )
> group by order_price, window_start, window_end, window_time;
> select window_start, window_end, sum(partial_price) total_price
> from TABLE(
> TUMBLE(
> TABLE window_view2, DESCRIPTOR(rowtime), INTERVAL '10' SECOND)
> )
> group by window_start, window_end;{code}
> exception
>  
> {code:java}
> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 
> 'window_start' is ambiguous
>     at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>     at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>     at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>     at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>     at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560)
>     ... 41 more{code}
>  
>  
> I think it's a bug



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25695) TemporalJoin cause state leak in some cases

2022-01-19 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17478471#comment-17478471
 ] 

Wenlong Lyu commented on FLINK-25695:
-

According to the description in FLINK-21833, I think the root cause may be the 
event time timer registered not cleaned when retention happened.  the right way 
to fix maybe remove the event timer  when cleaning up the all of the state.

> TemporalJoin cause state leak in some cases
> ---
>
> Key: FLINK-25695
> URL: https://issues.apache.org/jira/browse/FLINK-25695
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Lyn Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Last year, I reported the similar bug of TemporalJoin cause state leak. 
> Detail: FLINK-21833
> Recently, I found the fix code can reduce the the leak size but can not 
> resolve it completely.
> The code of line 213 cause it and the right fix is to invoke cleanUp() method.
> In FLINK-21833, we discussed when the code is running on line 213, that means 
> Left State, Right State, registeredTimerState is empty, actually the Left 
> State and Right State value(MapState) is empty but the key is still be in 
> state, So invoke state.clear() is necessary. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25695) TemporalJoin cause state leak in some cases

2022-01-19 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17478465#comment-17478465
 ] 

Wenlong Lyu commented on FLINK-25695:
-

[~zicat] could you share your sql too?  it would be easier for others to 
understand your case with the sql and configuration you used.
On the other hand, have you dump the heap and analyze what is the exactly 
leaked?

> TemporalJoin cause state leak in some cases
> ---
>
> Key: FLINK-25695
> URL: https://issues.apache.org/jira/browse/FLINK-25695
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.3
>Reporter: Lyn Zhang
>Priority: Major
>  Labels: pull-request-available
>
> Last year, I reported the similar bug of TemporalJoin cause state leak. 
> Detail: FLINK-21833
> Recently, I found the fix code can reduce the the leak size but can not 
> resolve it completely.
> The code of line 213 cause it and the right fix is to invoke cleanUp() method.
> In FLINK-21833, we discussed when the code is running on line 213, that means 
> Left State, Right State, registeredTimerState is empty, actually the Left 
> State and Right State value(MapState) is empty but the key is still be in 
> state, So invoke state.clear() is necessary. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25070) FLIP-195: Improve the name and structure of vertex and operator name for job

2022-01-17 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17477534#comment-17477534
 ] 

Wenlong Lyu commented on FLINK-25070:
-

[~MartijnVisser] users cannot control operator name, but they choose use the 
simplified format or the original format.

> FLIP-195: Improve the name and structure of vertex and operator name for job
> 
>
> Key: FLINK-25070
> URL: https://issues.apache.org/jira/browse/FLINK-25070
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Web Frontend, Table SQL / 
> Runtime
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>
> this is an umbrella issue tracking the improvement of operator/vertex names 
> in flink: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-195%3A+Improve+the+name+and+structure+of+vertex+and+operator+name+for+job



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25614) Let LocalWindowAggregate be chained with upstream

2022-01-17 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17477160#comment-17477160
 ] 

Wenlong Lyu commented on FLINK-25614:
-

[~jark][~jingzhang],  we can not use getLong anymore actually when the operator 
is chained, input could be GenericRowData.

> Let LocalWindowAggregate be chained with upstream
> -
>
> Key: FLINK-25614
> URL: https://issues.apache.org/jira/browse/FLINK-25614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.2
>Reporter: Q Kang
>Assignee: Q Kang
>Priority: Minor
>  Labels: pull-request-available
>
> When enabling two-phase aggregation (local-global) strategy for Window TVF, 
> the physical plan is shown as follows:
> {code:java}
> TableSourceScan -> Calc -> WatermarkAssigner -> Calc
> ||
> || [FORWARD]
> ||
> LocalWindowAggregate
> ||
> || [HASH]
> ||
> GlobalWindowAggregate
> ||
> ||
> ...{code}
> We can let the `LocalWindowAggregate` node be chained with upstream operators 
> in order to improve efficiency, just like the non-windowing counterpart 
> `LocalGroupAggregate`.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25600) Support new statement set syntax in sql client and update docs

2022-01-12 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17475093#comment-17475093
 ] 

Wenlong Lyu commented on FLINK-25600:
-

[~monster#12] thanks for offering help, I am sorry that the subtask is followup 
issue, and is already developing, you can take and try some other issues.

> Support new statement set syntax in sql client and update docs
> --
>
> Key: FLINK-25600
> URL: https://issues.apache.org/jira/browse/FLINK-25600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client
>Reporter: Wenlong Lyu
>Priority: Major
>
> this is a follow up of FLINK-25392, to finish adding the new statement set: 
> 1. the new statement set need multi line parsing support in sql client, which 
> is not supported currently:
> execute statement set begin
> insert xxx;
> insert xxx;
> end;
> 2. we need to update the doc to introduce the new syntax



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25614) Let LocalWindowAggregate be chained with upstream

2022-01-12 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474394#comment-17474394
 ] 

Wenlong Lyu commented on FLINK-25614:
-

hi, I think this is a bug actually. [~lmagics] could you also help check other 
operators in the table, fixing the missing ChainingStrategy, AFAIK  
 it is all missed in RowTimeMiniBatchAssginerOperator.

> Let LocalWindowAggregate be chained with upstream
> -
>
> Key: FLINK-25614
> URL: https://issues.apache.org/jira/browse/FLINK-25614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.2
>Reporter: Q Kang
>Priority: Minor
>  Labels: pull-request-available
>
> When enabling two-phase aggregation (local-global) strategy for Window TVF, 
> the physical plan is shown as follows:
> {code:java}
> TableSourceScan -> Calc -> WatermarkAssigner -> Calc
> ||
> || [FORWARD]
> ||
> LocalWindowAggregate
> ||
> || [HASH]
> ||
> GlobalWindowAggregate
> ||
> ||
> ...{code}
> We can let the `LocalWindowAggregate` node be chained with upstream operators 
> in order to improve efficiency, just like the non-windowing counterpart 
> `LocalGroupAggregate`.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25070) FLIP-195: Improve the name and structure of vertex and operator name for job

2022-01-12 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17474384#comment-17474384
 ] 

Wenlong Lyu commented on FLINK-25070:
-

[~MartijnVisser], yes, by default the sql operator name would be simplified in 
FLINK-1.15 by https://issues.apache.org/jira/browse/FLINK-25076, which is 
merged now.

> FLIP-195: Improve the name and structure of vertex and operator name for job
> 
>
> Key: FLINK-25070
> URL: https://issues.apache.org/jira/browse/FLINK-25070
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Web Frontend, Table SQL / 
> Runtime
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>
> this is an umbrella issue tracking the improvement of operator/vertex names 
> in flink: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-195%3A+Improve+the+name+and+structure+of+vertex+and+operator+name+for+job



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25600) Support new statement set syntax in sql client and update docs

2022-01-10 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25600:

Component/s: Table SQL / API
 Table SQL / Client

> Support new statement set syntax in sql client and update docs
> --
>
> Key: FLINK-25600
> URL: https://issues.apache.org/jira/browse/FLINK-25600
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Client
>Reporter: Wenlong Lyu
>Priority: Major
>
> this is a follow up of FLINK-25392, to finish adding the new statement set: 
> 1. the new statement set need multi line parsing support in sql client, which 
> is not supported currently:
> execute statement set begin
> insert xxx;
> insert xxx;
> end;
> 2. we need to update the doc to introduce the new syntax



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25600) Support new statement set syntax in sql client and update docs

2022-01-10 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25600:
---

 Summary: Support new statement set syntax in sql client and update 
docs
 Key: FLINK-25600
 URL: https://issues.apache.org/jira/browse/FLINK-25600
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu


this is a follow up of FLINK-25392, to finish adding the new statement set: 
1. the new statement set need multi line parsing support in sql client, which 
is not supported currently:
execute statement set begin
insert xxx;
insert xxx;
end;
2. we need to update the doc to introduce the new syntax



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-23031) Support to emit window result with periodic or non_periodic

2022-01-09 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23031?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17471687#comment-17471687
 ] 

Wenlong Lyu commented on FLINK-23031:
-

hi, [~aitozi], according to your description: you are trying to optimize the 
timer by avoiding trigger timer when there is no new input of a window right? 
I think we can just do the optimization internally(just don't register timer in 
onProcessingTime as you proposed firstly), instead of expose new option of 
users, because when there is no new input, the result of window will not be 
changed, the output with the same UA and UB will be ignored in Window Operator, 
so there is not output records actually when timer is triggered. 
BTW, It would be better to description the solution and affects directly in the 
description of issue, and attaching the testing result if you has done some.

> Support to emit window result with periodic or non_periodic
> ---
>
> Key: FLINK-23031
> URL: https://issues.apache.org/jira/browse/FLINK-23031
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Aitozi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25463) Set is not supported in User Defined Functions

2021-12-29 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17466356#comment-17466356
 ] 

Wenlong Lyu commented on FLINK-25463:
-

hi, [~surendra.lalwani], As I known, there is no type of Set according to the 
sql Standard. If you really need to define a column with type of Set, you 
should use FunctionHint like the example in the java doc of FunctionHint. If 
you have problem in defining custom type, you could post your code and error 
messages, so that we can understand it.

> Set is not supported in User Defined Functions 
> ---
>
> Key: FLINK-25463
> URL: https://issues.apache.org/jira/browse/FLINK-25463
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.3
>Reporter: Surendra Lalwani
>Priority: Major
> Fix For: 1.13.6
>
>
> Hi Team,
>  
> List is supported in UDFs but Set is not supported in UDFs. Please check



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25463) Set is not supported in User Defined Functions

2021-12-27 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465973#comment-17465973
 ] 

Wenlong Lyu commented on FLINK-25463:
-

hi, [~surendra.lalwani] could you post more detail about your case in the 
issue, it would help other in the community to understand better

> Set is not supported in User Defined Functions 
> ---
>
> Key: FLINK-25463
> URL: https://issues.apache.org/jira/browse/FLINK-25463
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.13.3
>Reporter: Surendra Lalwani
>Priority: Major
> Fix For: 1.13.6
>
>
> Hi Team,
>  
> List is supported in UDFs but Set is not supported in UDFs. Please check



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25459) When inserting row type fields into sink, the order needs to be maintained

2021-12-27 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465946#comment-17465946
 ] 

Wenlong Lyu commented on FLINK-25459:
-

[~qyw919867774] the matching logic is defined in SQL standard you could check 
more details in the SQL standard if you are interested in it: 
"If the  is omitted, then an  that 
identifies all columns of T in the ascending sequence of their ordinal 
positions within T is implicit."

> When inserting row type fields into sink, the order needs to be maintained
> --
>
> Key: FLINK-25459
> URL: https://issues.apache.org/jira/browse/FLINK-25459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: qyw
>Priority: Major
>
> When I insert a row type value into sink, why do I need to maintain the field 
> order in row?
> This is the comparison between my query schema and sink schema:
> Query schema: [ceshi: ROW<`name` STRING, `id` INT, `age` INT, `test` ROW<`c` 
> STRING>>]
> Sink schema:  [ceshi: ROW<`id` INT, `name` STRING, `age` INT, `test` ROW<`c` 
> STRING>>] 
> An error will be thrown:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Column types of query result and sink for registered table 
> 'default_catalog.default_database.kafka_target' do not match.
> Cause: Incompatible types for sink column 'ceshi' at position 0.
>  
>  
> Is this phenomenon reasonable?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25459) When inserting row type fields into sink, the order needs to be maintained

2021-12-27 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17465916#comment-17465916
 ] 

Wenlong Lyu commented on FLINK-25459:
-

[~qyw919867774] this is by designed. you can try insert into with explicit 
column like if you want different order: INSERT INTO neworders (order_no, 
product, qty) SELECT order_no, product, qty FROM orders

> When inserting row type fields into sink, the order needs to be maintained
> --
>
> Key: FLINK-25459
> URL: https://issues.apache.org/jira/browse/FLINK-25459
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.14.2
>Reporter: qyw
>Priority: Major
>
> When I insert a row type value into sink, why do I need to maintain the field 
> order in row?
> This is the comparison between my query schema and sink schema:
> Query schema: [ceshi: ROW<`name` STRING, `id` INT, `age` INT, `test` ROW<`c` 
> STRING>>]
> Sink schema:  [ceshi: ROW<`id` INT, `name` STRING, `age` INT, `test` ROW<`c` 
> STRING>>] 
> An error will be thrown:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Column types of query result and sink for registered table 
> 'default_catalog.default_database.kafka_target' do not match.
> Cause: Incompatible types for sink column 'ceshi' at position 0.
>  
>  
> Is this phenomenon reasonable?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25409) Add cache metric to LookupFunction

2021-12-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463861#comment-17463861
 ] 

Wenlong Lyu commented on FLINK-25409:
-

hi, [~MartijnVisser], you are right about the sharing. However there is not 
lookup abstraction for DataStream API/connector, we need to use FlatMapFunction 
or AsyncFunction directly. LookupSource would be translate to a TableFunction 
instead. If you means that we need to introduce such support, It is another 
story IMO. 


> Add cache metric to LookupFunction
> --
>
> Key: FLINK-25409
> URL: https://issues.apache.org/jira/browse/FLINK-25409
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yuan Zhu
>Priority: Major
>
> Since we encounter performance problem when lookup join in production env 
> frequently, adding metrics to monitor Lookup function cache is very helpful 
> to troubleshoot.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25411) JsonRowSerializationSchema unable to parse TIMESTAMP_LTZ fields

2021-12-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463849#comment-17463849
 ] 

Wenlong Lyu commented on FLINK-25411:
-

[~surendra.lalwani] I think it would be better to post an example which can 
reproduce the case to help others understanding the issue. I agree it is missed 
when add supporting on Timestamp_LTZ, the ObjectMapper doesn't support Instant 
by default. 

JsonRowSerializationSchema is deprecated now, you can try 
JsonRowDataSerializationSchema, it can work well, and RowData is the format we 
used in Table/SQL.

> JsonRowSerializationSchema unable to parse TIMESTAMP_LTZ fields
> ---
>
> Key: FLINK-25411
> URL: https://issues.apache.org/jira/browse/FLINK-25411
> Project: Flink
>  Issue Type: New Feature
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Reporter: Surendra Lalwani
>Priority: Minor
>
> While I try to fire a simple query Select current_timestamp from table_name , 
> it gives error that Could not serialize row and asks me to add shaded flink 
> dependency for jsr-310. Seems like in the Serializer , the JavaTimeModule is 
> not added



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25409) Add cache metric to LookupFunction

2021-12-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463826#comment-17463826
 ] 

Wenlong Lyu commented on FLINK-25409:
-

hi, [~MartijnVisser] this ticket may be not relevant to the new source/sink API 
and FLIP-33,  lookup source is a SQL concept.  

+1 for common LookupFunctionWithCache. It can help unify the caching config of 
different connectors and reduce duplicate work on connector development. In 
addition, we can add one more metric to measure lookup latency per record.

> Add cache metric to LookupFunction
> --
>
> Key: FLINK-25409
> URL: https://issues.apache.org/jira/browse/FLINK-25409
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Reporter: Yuan Zhu
>Priority: Major
>
> Since we encounter performance problem when lookup join in production env 
> frequently, adding metrics to monitor Lookup function cache is very helpful 
> to troubleshoot.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25217) FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17463276#comment-17463276
 ] 

Wenlong Lyu commented on FLINK-25217:
-

[~twalthr] ok, I can take subtask 1 first, if no one take it yet.


> FLIP-190: Support Version Upgrades for Table API & SQL Programs
> ---
>
> Key: FLINK-25217
> URL: https://issues.apache.org/jira/browse/FLINK-25217
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Nowadays, the Table & SQL API is as important to Flink as the DataStream API. 
> It is one of the main abstractions for expressing pipelines that perform 
> stateful stream processing. Users expect the same backwards compatibility 
> guarantees when upgrading to a newer Flink version as with the DataStream API.
> In particular, this means:
> * once the operator topology is defined, it remains static and does not 
> change between Flink versions, unless resulting in better performance,
> * business logic (defined using expressions and functions in queries) behaves 
> identical as before the version upgrade,
> * the state of a Table & SQL API program can be restored from a savepoint of 
> a previous version,
> * adding or removing stateful operators should be made possible in the 
> DataStream API.
> The same query can remain up and running after upgrades.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25217) FLIP-190: Support Version Upgrades for Table API & SQL Programs

2021-12-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462929#comment-17462929
 ] 

Wenlong Lyu commented on FLINK-25217:
-

hi, [~twalthr] could you assign the issues on table serialization improvements 
to me? I could help on this part first.

> FLIP-190: Support Version Upgrades for Table API & SQL Programs
> ---
>
> Key: FLINK-25217
> URL: https://issues.apache.org/jira/browse/FLINK-25217
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Nowadays, the Table & SQL API is as important to Flink as the DataStream API. 
> It is one of the main abstractions for expressing pipelines that perform 
> stateful stream processing. Users expect the same backwards compatibility 
> guarantees when upgrading to a newer Flink version as with the DataStream API.
> In particular, this means:
> * once the operator topology is defined, it remains static and does not 
> change between Flink versions, unless resulting in better performance,
> * business logic (defined using expressions and functions in queries) behaves 
> identical as before the version upgrade,
> * the state of a Table & SQL API program can be restored from a savepoint of 
> a previous version,
> * adding or removing stateful operators should be made possible in the 
> DataStream API.
> The same query can remain up and running after upgrades.
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=191336489



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25137) Cannot do window aggregation after a regular join: Window can only be defined on a time attribute column

2021-12-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462909#comment-17462909
 ] 

Wenlong Lyu commented on FLINK-25137:
-

[~liuhb86] you could close this issue, since it is not a bug and your case is 
solved

> Cannot do window aggregation after a regular join: Window can only be defined 
> on a time attribute column
> 
>
> Key: FLINK-25137
> URL: https://issues.apache.org/jira/browse/FLINK-25137
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a stream joining with a bounded lookup table. Then I want to do a 
> window aggregation on this stream.
> The code looks like this:
>  
> {code:java}
> var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" +
> " join lookup on test.v = lookup.a");
> tableEnv.createTemporaryView("joined_table", joined);
> var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" +
> " FROM TABLE(\n" +
> " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' 
> minutes))\n" +
> " GROUP BY window_start, window_end, c");
> {code}
> where "test" is a stream and "lookup" is a bounded table. Full code is here: 
> [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java]
>  
>  
> However, it failed with an exception:
> {code:java}
> Window can only be defined on a time attribute column, but is type of 
> TIMESTAMP(3)
> org.apache.flink.table.api.ValidationException: Window can only be defined on 
> a time attribute column, but is type of TIMESTAMP(3)
>     at 
> org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167)
>     at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> 

[jira] [Commented] (FLINK-25337) Check whether the target table is valid when SqlToOperationConverter.convertSqlInsert

2021-12-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462579#comment-17462579
 ] 

Wenlong Lyu commented on FLINK-25337:
-

Regarding sql validation,I think it is necessary to go through all steps of 
optimization the same as what you are doing now, validation in sql operation 
converter can not ensure that the sql is ok.

> Check whether the target table is valid when 
> SqlToOperationConverter.convertSqlInsert
> -
>
> Key: FLINK-25337
> URL: https://issues.apache.org/jira/browse/FLINK-25337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: vim-wang
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> when I execute insert sql like "insert into t1 select ...", 
> If the t1 is not defined,sql will not throw an exception after 
> SqlToOperationConverter.convertSqlInsert(), I think this is unreasonable, why 
> not use catalogManager to check whether the target table is valid?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25337) Check whether the target table is valid when SqlToOperationConverter.convertSqlInsert

2021-12-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462572#comment-17462572
 ] 

Wenlong Lyu commented on FLINK-25337:
-

hi,[~vim-wang] I think it would be better if you could post the case in 
English, it can help others in the community  understand and help you.

> Check whether the target table is valid when 
> SqlToOperationConverter.convertSqlInsert
> -
>
> Key: FLINK-25337
> URL: https://issues.apache.org/jira/browse/FLINK-25337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: vim-wang
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> when I execute insert sql like "insert into t1 select ...", 
> If the t1 is not defined,sql will not throw an exception after 
> SqlToOperationConverter.convertSqlInsert(), I think this is unreasonable, why 
> not use catalogManager to check whether the target table is valid?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-19 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17462349#comment-17462349
 ] 

Wenlong Lyu commented on FLINK-25330:
-

Hi, Maybe we could support customized DeserializationSchema, and provide some 
common implementation like DELETE_ALL, DELETE_LATEST, PADDING_EMPTY, to support 
different scenarios.

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Reporter: Bruce Wong
>Assignee: Jing Ge
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25337) Check whether the target table is valid when SqlToOperationConverter.convertSqlInsert

2021-12-16 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460499#comment-17460499
 ] 

Wenlong Lyu commented on FLINK-25337:
-

hi, [~vim-wang], I think it is because we need to check not only the table is 
existed, but also the schema should be matched and query is valid, etc, it may 
be better to put all check together. 
Do you have any actual problem on the check? IMO, the failure check will soon 
come up after convertSqlInsert.  If you could post your case, others in 
community could understand your problem better.

> Check whether the target table is valid when 
> SqlToOperationConverter.convertSqlInsert
> -
>
> Key: FLINK-25337
> URL: https://issues.apache.org/jira/browse/FLINK-25337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: vim-wang
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> when I execute insert sql like "insert into t1 select ...", 
> If the t1 is not defined,sql will not throw an exception after 
> SqlToOperationConverter.convertSqlInsert(), I think this is unreasonable, why 
> not use catalogManager to check whether the target table is valid?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25330) Flink SQL doesn't retract all versions of Hbase data

2021-12-15 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17460396#comment-17460396
 ] 

Wenlong Lyu commented on FLINK-25330:
-

hi, [~Bruce Wong] I have a concern on deleting all of the version when receive 
a retract message. 
IMO, users who uses HBase in production track changes by enabling 
multi-version, so it maybe not actually needed by users to delete all of the 
version when receiving a retract message, instead, they may want to translate 
the retract message to a flag column such as is_deleted or set all columns to 
be empty. WDYT?

> Flink SQL doesn't retract all versions of Hbase data
> 
>
> Key: FLINK-25330
> URL: https://issues.apache.org/jira/browse/FLINK-25330
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / HBase
>Affects Versions: 1.14.0
>Reporter: Bruce Wong
>Priority: Critical
>  Labels: pull-request-available
> Attachments: image-2021-12-15-20-05-18-236.png
>
>
> h2. Background
> When we use CDC to synchronize mysql data to HBase, we find that HBase 
> deletes only the last version of the specified rowkey when deleting mysql 
> data. The data of the old version still exists. You end up using the wrong 
> data. And I think its a bug of HBase connector.
> The following figure shows Hbase data changes before and after mysql data is 
> deleted.
> !image-2021-12-15-20-05-18-236.png|width=910,height=669!
>  
> h2.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25272) Flink sql syntax analysis is inconsistent with the sql grammer declared now

2021-12-13 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458209#comment-17458209
 ] 

Wenlong Lyu commented on FLINK-25272:
-

hi, [~lam167]  I think this is a doc missing issue instead of error in Parser. 
The key word is TEMPORARY  not TEMPORAL,  TEMPORARY is used to tell catalog not 
to persist the table defined by the DDL, the table is only available on current 
session.

> Flink sql syntax analysis is inconsistent with the sql grammer declared now
> ---
>
> Key: FLINK-25272
> URL: https://issues.apache.org/jira/browse/FLINK-25272
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Jianhui Dong
>Priority: Minor
>
> Syntax analysis in the 
> code(https://github.com/apache/flink/blob/master/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl)
>  is inconsistent with the grammar declared on the 
> module(https://github.com/apache/flink/edit/master/docs/content/docs/dev/table/sql/create.md).
> The document states that all CREATE statement formats are `CREATE TABLE [IF 
> NOT EXISTS]`, there is no `TEMPORAL` keywords, but in 
> flink-sql-parser/src/main/codegen/includes/parserImpls.ftl::SqlCreateExtended,
>  it will recorganize `TEMPORAL` token and set `isTemporary` with true if the 
> keyword exists.
> Besides, I think the field `isTemporary` is outdated too, every table in 
> Flink is a temporal table as the document wrote. In fact, when they are 
> stored in the catalog, they are all converted to `CatalogSchemaTable` which 
> inherits the `TemporalTable` interface.
> BTW, the view and the function have the same condition. IMO, I think we 
> should remove the `TEMPORAL` token in parserImpls.ftl and also remove the 
> `isTemporary` field which now always is false.
> This issue may be a little confused cause my English is not very well. If you 
> have any questions, you can add comments under the issue or contact me at 
> lam...@apache.org and I could also provide a pull request for this.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25265) RUNNING to FAILED with failure cause. This might indicate that the remote task manager was lost.

2021-12-12 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458134#comment-17458134
 ] 

Wenlong Lyu commented on FLINK-25265:
-

hi, [~coco_yiyayiya] have you checked the log of task manager lost, It is 
usually because of full GC problem when there are TM lost. That is why it can 
work well when you add a `limit 1`

> RUNNING to FAILED with failure cause. This might indicate that the remote 
> task manager was lost.
> 
>
> Key: FLINK-25265
> URL: https://issues.apache.org/jira/browse/FLINK-25265
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.13.3
>Reporter: coco
>Priority: Major
>  Labels: Flink-CDC, Flink-sql
> Fix For: 1.13.3
>
>
> When I use the following SQL statement:
> {code:java}
> insert into table_result (select ..from   table_A  
> left join  table_B  
> left join  table_C 
> left join  table_D  
> where  ... and creatTime>'2021-11-1 00:00:00') {code}
> Flink task will encounter the following problems after starting, the task is 
> always restarting, and the data cannot be updated to table_result.
> 2021-12-11 18:02:04,262 WARN org.apache.flink.runtime.taskmanager.Task [] - 
> Join(joinType=[LeftOuterJoin], where=[(problemTypeId = 
> incidentProblemTypeId)], select=[incidentId, levelId, problemTypeId, 
> customerId, simpleDescribe, currentHandlerId, createTime, updateTime, 
> acceptedTime, confirmedTime, completeTime, rootcause, incidentTypeName, 
> incidentProblemTypeId, incidentProblemTypeName], leftInputSpec=[NoUniqueKey], 
> rightInputSpec=[NoUniqueKey]) -> Calc(select=[incidentId, incidentTypeName, 
> createTime, updateTime, acceptedTime, confirmedTime, completeTime, 
> incidentProblemTypeName, simpleDescribe, CAST(_UTF-16LE'回访放弃':VARCHAR(7) 
> CHARACTER SET "UTF-16LE") AS status_name, currentHandlerId, levelId, 
> customerId, rootcause]) -> NotNullEnforcer(fields=[incidentId]) -> Sink: 
> Sink(table=[default_catalog.default_database.osm_result], fields=[incidentId, 
> incidentTypeName, createTime, updateTime, acceptedTime, confirmedTime, 
> completeTime, incidentProblemTypeName, simpleDescribe, status_name, 
> currentHandlerId, levelId, customerId, rootcause]) (2/4)#0 
> (78d78b78377fcafc9fb2e4c3797af71c) *{color:#FF}switched from RUNNING to 
> FAILED with failure cause: 
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
> Connection unexpectedly closed by remote task manager 
> 'cnnorth7a-CloudDataCompass-DataHub-Flink-cluster-0002/10.66.164.42:39152'. 
> This might indicate that the remote task manager was lost.{color}*
> at 
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
> at 
> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
> at 
> 

[jira] [Commented] (FLINK-25171) When the DDL statement was executed, the column names of the Derived Columns were not validated

2021-12-06 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17454432#comment-17454432
 ] 

Wenlong Lyu commented on FLINK-25171:
-

I think it is good to not allow duplicated column name in both tables and 
views, the result could be weird and hard to debug, when there are tens of 
column in a table. 

> When the DDL statement was executed, the column names of the Derived Columns 
> were not validated
> ---
>
> Key: FLINK-25171
> URL: https://issues.apache.org/jira/browse/FLINK-25171
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: shouzuo meng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.14.0
>
> Attachments: 5261638775663_.pic.jpg
>
>
> When I execute the DDL statement, I mistakenly use the duplicate field name 
> in THE SQL, but the execution result of the program does not throw any 
> exception or prompt. In MergeTableLikeUtilTest. Java# mergePhysicalColumns 
> add repeated the TableColumn (), also do not throw any exceptions, review the 
> code logic found only on the source table schema fields, It is not a 
> duplicate and derived tables and fields of the source table is verified, and 
> no field of derived tables if repeated verification, adding 
> physicalFieldNamesToTypes, there will be a repeating field coverage,The 
> following are the execution statements and the results
> DDL sql:
> CREATE TABLE test1 (
>   `log_version` string COMMENT '日志版本',
>   `log_version` INTEGER COMMENT '日志版本',
>   `pv_time` string COMMENT '日志时间'
> ) with(
>         'connector' = 'kafka',
>         'topic' = 'xxx',
>         'properties.bootstrap.servers' = 'xxx:9110',
>         'scan.startup.mode'='latest-offset',
>         'format' = 'json',
> )
> {code:java}
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> TableResult result = tEnv.executeSql(
> CREATE TABLE test1 (
>   `log_version` string COMMENT '日志版本',
>   `log_version` INTEGER COMMENT '日志版本',
>   `pv_time` string COMMENT '日志时间'
> ) with(
>         'connector' = 'kafka',
>         'topic' = 'xxx',
>         'properties.bootstrap.servers' = 'xxx:9110',
>         'scan.startup.mode'='latest-offset',
>         'format' = 'json',
> )
> ) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25198) add document about how to debug with the name and description

2021-12-06 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25198:
---

 Summary: add document about how to debug with the name and 
description 
 Key: FLINK-25198
 URL: https://issues.apache.org/jira/browse/FLINK-25198
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Wenlong Lyu
 Fix For: 1.15.0


the doc could in the debugging section 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/debugging/debugging_event_time/
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25149) getScanRuntimeProvider method called before the pushDown method,

2021-12-03 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17452845#comment-17452845
 ] 

Wenlong Lyu commented on FLINK-25149:
-

[~waittting] no, source function will not execute in optimization, you can 
refer more details in DynamicSourceUtils#validateScanSource. 
getScanRuntimeProvider will be executed again at the end of the optimization, 
the parameters you passed by pushdown will be available at that time.

> getScanRuntimeProvider method called before the pushDown method,
> 
>
> Key: FLINK-25149
> URL: https://issues.apache.org/jira/browse/FLINK-25149
> Project: Flink
>  Issue Type: Bug
>Reporter: waittting
>Priority: Major
>
> ```
> public class CCDynamicTableSource implements ScanTableSource, 
> SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown {}
> ```
> Why is the getScanRuntimeProvider method called before the pushDown method, 
> so that the parameters I get in the pushDown are not available in the 
> getScanRuntimeProvider?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25149) getScanRuntimeProvider method called before the pushDown method,

2021-12-02 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17452788#comment-17452788
 ] 

Wenlong Lyu commented on FLINK-25149:
-

[~waittting] currently the table planner will validate the DynamicTableSource 
before doing the optimization, you should make sure that the original table 
source created by factory can work well without push down.

> getScanRuntimeProvider method called before the pushDown method,
> 
>
> Key: FLINK-25149
> URL: https://issues.apache.org/jira/browse/FLINK-25149
> Project: Flink
>  Issue Type: Bug
>Reporter: waittting
>Priority: Major
>
> ```
> public class CCDynamicTableSource implements ScanTableSource, 
> SupportsLimitPushDown, SupportsFilterPushDown, SupportsProjectionPushDown {}
> ```
> Why is the getScanRuntimeProvider method called before the pushDown method, 
> so that the parameters I get in the pushDown are not available in the 
> getScanRuntimeProvider?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25137) Cannot do window aggregation after a regular join: Window can only be defined on a time attribute column

2021-12-02 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17452708#comment-17452708
 ] 

Wenlong Lyu commented on FLINK-25137:
-

[~liuhb86] there is no much documents about it. Generally speaking, once the 
column is used in calculation such as agg, calc, join etc, the TimeAttribute 
would be materialized to Timestamp, you can get the details in 
RelTimeIndicatorConverter. 

Yes, you should never do  regular join between a stream and bounded table if 
you enable state ttl.

> Cannot do window aggregation after a regular join: Window can only be defined 
> on a time attribute column
> 
>
> Key: FLINK-25137
> URL: https://issues.apache.org/jira/browse/FLINK-25137
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a stream joining with a bounded lookup table. Then I want to do a 
> window aggregation on this stream.
> The code looks like this:
>  
> {code:java}
> var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" +
> " join lookup on test.v = lookup.a");
> tableEnv.createTemporaryView("joined_table", joined);
> var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" +
> " FROM TABLE(\n" +
> " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' 
> minutes))\n" +
> " GROUP BY window_start, window_end, c");
> {code}
> where "test" is a stream and "lookup" is a bounded table. Full code is here: 
> [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java]
>  
>  
> However, it failed with an exception:
> {code:java}
> Window can only be defined on a time attribute column, but is type of 
> TIMESTAMP(3)
> org.apache.flink.table.api.ValidationException: Window can only be defined on 
> a time attribute column, but is type of TIMESTAMP(3)
>     at 
> org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167)
>     at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at 

[jira] [Commented] (FLINK-25117) NoSuchMethodError getCatalog()

2021-12-02 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17452705#comment-17452705
 ] 

Wenlong Lyu commented on FLINK-25117:
-

[~zzt]  when using the image, you can copy only connector jars to lib, and 
remove all other flink jars you copied manually, the image already contains all 
of core libs, for sql, flink-table-blink_2.12-1.13.3.jar is enough. 
I am afraid that the jars you manually copied (such as ) is not compiled based 
on the final branch/tag of 1.13.3.

> NoSuchMethodError getCatalog()
> --
>
> Key: FLINK-25117
> URL: https://issues.apache.org/jira/browse/FLINK-25117
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.3
> Environment: offical docker image,  flink:1.13.3-scala_2.12
>Reporter: zzt
>Priority: Major
>
> {code:java}
> Flink SQL> insert into `wide_order` (`user_id`, `row_num`, `sum`)
> > select `t`.`receiver_user_id`, `t`.`rowNum`, `t`.`total`
> > from (select `t`.`receiver_user_id`, `t`.`total`, ROW_NUMBER() OVER (ORDER 
> > BY total desc) as `rowNum`
> >       from (select `order_view`.`receiver_user_id`, 
> > sum(`order_view`.`total`) as `total`
> >             from `order_view` where create_time > '2021-11-01 00:24:55.453'
> >             group by `order_view`.`receiver_user_id`) `t`) `t`
> > where `rowNum` <= 1;
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.flink.table.catalog.CatalogManager$TableLookupResult.getCatalog()Ljava/util/Optional;
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:106)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:90)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:79)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:74)
>     at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>     at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntryFrom(SqlValidatorUtil.java:1059)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntry(SqlValidatorUtil.java:1016)
>     at 
> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:119)
>     at 
> org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:86)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:116)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:56)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:47)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
>     ... 1 more
> Shutting 

[jira] [Commented] (FLINK-25137) Cannot do window aggregation after a regular join: Window can only be defined on a time attribute column

2021-12-01 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17452108#comment-17452108
 ] 

Wenlong Lyu commented on FLINK-25137:
-

hi, [~liuhb86] this is a by-designed limitation on Flink Stream SQL currently. 
Event time would be materialized to Timestamp after join, which can not be used 
in window on Streaming.

> Cannot do window aggregation after a regular join: Window can only be defined 
> on a time attribute column
> 
>
> Key: FLINK-25137
> URL: https://issues.apache.org/jira/browse/FLINK-25137
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: Hongbo
>Priority: Major
>
> I have a stream joining with a bounded lookup table. Then I want to do a 
> window aggregation on this stream.
> The code looks like this:
>  
> {code:java}
> var joined = tableEnv.sqlQuery("SELECT test.ts, test.v, lookup.c from test" +
> " join lookup on test.v = lookup.a");
> tableEnv.createTemporaryView("joined_table", joined);
> var agg = tableEnv.sqlQuery("SELECT window_start, window_end, c, sum(v)\n" +
> " FROM TABLE(\n" +
> " HOP(TABLE joined_table, DESCRIPTOR(ts), INTERVAL '10' seconds, INTERVAL '5' 
> minutes))\n" +
> " GROUP BY window_start, window_end, c");
> {code}
> where "test" is a stream and "lookup" is a bounded table. Full code is here: 
> [https://github.com/liuhb86/flink-test/blob/flink25137/flink-test/src/test/java/com/flinktest/FlinkTest.java]
>  
>  
> However, it failed with an exception:
> {code:java}
> Window can only be defined on a time attribute column, but is type of 
> TIMESTAMP(3)
> org.apache.flink.table.api.ValidationException: Window can only be defined on 
> a time attribute column, but is type of TIMESTAMP(3)
>     at 
> org.apache.flink.table.planner.plan.utils.WindowUtil$.convertToWindowingStrategy(WindowUtil.scala:183)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:193)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getProjectWindowProperties(FlinkRelMdWindowProperties.scala:89)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMdWindowProperties.getWindowProperties(FlinkRelMdWindowProperties.scala:75)
>     at 
> GeneratedMetadataHandler_WindowProperties.getWindowProperties_$(Unknown 
> Source)
>     at GeneratedMetadataHandler_WindowProperties.getWindowProperties(Unknown 
> Source)
>     at 
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.getRelWindowProperties(FlinkRelMetadataQuery.java:261)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.gatherIndicesToMaterialize(RelTimeIndicatorConverter.java:433)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convertAggInput(RelTimeIndicatorConverter.java:416)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visitAggregate(RelTimeIndicatorConverter.java:402)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.visit(RelTimeIndicatorConverter.java:167)
>     at org.apache.calcite.rel.AbstractRelNode.accept(AbstractRelNode.java:217)
>     at 
> org.apache.flink.table.planner.calcite.RelTimeIndicatorConverter.convert(RelTimeIndicatorConverter.java:133)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkRelTimeIndicatorProgram.optimize(FlinkRelTimeIndicatorProgram.scala:35)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at 

[jira] [Updated] (FLINK-25123) Improve expression description in SQL operator

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25123:

Fix Version/s: 1.15.0

> Improve expression description in SQL operator
> --
>
> Key: FLINK-25123
> URL: https://issues.apache.org/jira/browse/FLINK-25123
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25076) Simplify name of SQL operators

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25076:

Fix Version/s: 1.15.0

> Simplify name of SQL operators
> --
>
> Key: FLINK-25076
> URL: https://issues.apache.org/jira/browse/FLINK-25076
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25076) Simplify name of SQL operators

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25076:

Component/s: Table SQL / Planner

> Simplify name of SQL operators
> --
>
> Key: FLINK-25076
> URL: https://issues.apache.org/jira/browse/FLINK-25076
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25074) Simplify name of window operators in DS by moving details to description

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25074:

Fix Version/s: 1.15.0

> Simplify name of window operators in DS by moving details to description
> 
>
> Key: FLINK-25074
> URL: https://issues.apache.org/jira/browse/FLINK-25074
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25074) Simplify name of window operators in DS by moving details to description

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25074:

Component/s: API / DataStream

> Simplify name of window operators in DS by moving details to description
> 
>
> Key: FLINK-25074
> URL: https://issues.apache.org/jira/browse/FLINK-25074
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25073) Introduce Tree Mode description for job vertex

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25073:

Component/s: API / DataStream

> Introduce Tree Mode description for job vertex
> --
>
> Key: FLINK-25073
> URL: https://issues.apache.org/jira/browse/FLINK-25073
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25073) Introduce Tree Mode description for job vertex

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25073:

Fix Version/s: 1.15.0

> Introduce Tree Mode description for job vertex
> --
>
> Key: FLINK-25073
> URL: https://issues.apache.org/jira/browse/FLINK-25073
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25072) Introduce description for operator

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25072?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25072:

Fix Version/s: 1.15.0

> Introduce description for operator
> --
>
> Key: FLINK-25072
> URL: https://issues.apache.org/jira/browse/FLINK-25072
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25070) FLIP-195: Improve the name and structure of vertex and operator name for job

2021-11-30 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25070:

Fix Version/s: 1.15.0

> FLIP-195: Improve the name and structure of vertex and operator name for job
> 
>
> Key: FLINK-25070
> URL: https://issues.apache.org/jira/browse/FLINK-25070
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Web Frontend, Table SQL / 
> Runtime
>Reporter: Wenlong Lyu
>Assignee: Wenlong Lyu
>Priority: Major
> Fix For: 1.15.0
>
>
> this is an umbrella issue tracking the improvement of operator/vertex names 
> in flink: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-195%3A+Improve+the+name+and+structure+of+vertex+and+operator+name+for+job



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25123) Improve expression description in SQL operator

2021-11-30 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25123:
---

 Summary: Improve expression description in SQL operator
 Key: FLINK-25123
 URL: https://issues.apache.org/jira/browse/FLINK-25123
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25117) NoSuchMethodError getCatalog()

2021-11-30 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17451528#comment-17451528
 ] 

Wenlong Lyu commented on FLINK-25117:
-

[~zzt] I would suggest you not put any flink jars from 1.13.3 to the lib if you 
are running job on flink-1.13.2. you can use jars from 1.13.2 or use 
flink-1.13.3 image

> NoSuchMethodError getCatalog()
> --
>
> Key: FLINK-25117
> URL: https://issues.apache.org/jira/browse/FLINK-25117
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.3
> Environment: offical docker image,  flink:1.13.2-scala_2.12
>Reporter: zzt
>Priority: Major
>
> {code:java}
> Flink SQL> insert into `wide_order` (`user_id`, `row_num`, `sum`)
> > select `t`.`receiver_user_id`, `t`.`rowNum`, `t`.`total`
> > from (select `t`.`receiver_user_id`, `t`.`total`, ROW_NUMBER() OVER (ORDER 
> > BY total desc) as `rowNum`
> >       from (select `order_view`.`receiver_user_id`, 
> > sum(`order_view`.`total`) as `total`
> >             from `order_view` where create_time > '2021-11-01 00:24:55.453'
> >             group by `order_view`.`receiver_user_id`) `t`) `t`
> > where `rowNum` <= 1;
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.flink.table.catalog.CatalogManager$TableLookupResult.getCatalog()Ljava/util/Optional;
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:106)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:90)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:79)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:74)
>     at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>     at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntryFrom(SqlValidatorUtil.java:1059)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntry(SqlValidatorUtil.java:1016)
>     at 
> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:119)
>     at 
> org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:86)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:116)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:56)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:47)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
>     ... 1 more
> Shutting down the session...
> done. {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-25117) NoSuchMethodError getCatalog()

2021-11-30 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-25117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17451499#comment-17451499
 ] 

Wenlong Lyu commented on FLINK-25117:
-

hi, [~zzt] the method TableLookupResult.getCatalog()Ljava/util/Optional is 
added at 1.13.3. Please check the dependency in your custom jars, make sure 
that all of flink library(such as flink-table-api-java) are excluded in the 
custom jars.

> NoSuchMethodError getCatalog()
> --
>
> Key: FLINK-25117
> URL: https://issues.apache.org/jira/browse/FLINK-25117
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Affects Versions: 1.13.3
> Environment: offical docker image,  flink:1.13.2-scala_2.12
>Reporter: zzt
>Priority: Major
>
> {code:java}
> Flink SQL> insert into `wide_order` (`user_id`, `row_num`, `sum`)
> > select `t`.`receiver_user_id`, `t`.`rowNum`, `t`.`total`
> > from (select `t`.`receiver_user_id`, `t`.`total`, ROW_NUMBER() OVER (ORDER 
> > BY total desc) as `rowNum`
> >       from (select `order_view`.`receiver_user_id`, 
> > sum(`order_view`.`total`) as `total`
> >             from `order_view` where create_time > '2021-11-01 00:24:55.453'
> >             group by `order_view`.`receiver_user_id`) `t`) `t`
> > where `rowNum` <= 1;
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
>     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.flink.table.catalog.CatalogManager$TableLookupResult.getCatalog()Ljava/util/Optional;
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.extractTableStats(DatabaseCalciteSchema.java:106)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getStatistic(DatabaseCalciteSchema.java:90)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:79)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:74)
>     at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>     at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntryFrom(SqlValidatorUtil.java:1059)
>     at 
> org.apache.calcite.sql.validate.SqlValidatorUtil.getTableEntry(SqlValidatorUtil.java:1016)
>     at 
> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:119)
>     at 
> org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader.getTable(FlinkCalciteCatalogReader.java:86)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter$.appendPartitionAndNullsProjects(PreValidateReWriter.scala:116)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:56)
>     at 
> org.apache.flink.table.planner.calcite.PreValidateReWriter.visit(PreValidateReWriter.scala:47)
>     at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:113)
>     at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>     at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>     at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$parseStatement$1(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
>     at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:176)
>     at 
> org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:385)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:326)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
>     at 
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
>     at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
>     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
>     at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
>     ... 1 more
> Shutting down the session...
> done. {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

[jira] [Created] (FLINK-25118) Add vertex index as prefix in vertex name

2021-11-30 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25118:
---

 Summary: Add vertex index as prefix in vertex name
 Key: FLINK-25118
 URL: https://issues.apache.org/jira/browse/FLINK-25118
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-29 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17450880#comment-17450880
 ] 

Wenlong Lyu commented on FLINK-20370:
-

[~lincoln.86xy] Even sink implements ParallelismProvider, parallelism can still 
be undefined, because it returns an optional value.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25074) Simplify name of window operators in DS by moving details to description

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25074:
---

 Summary: Simplify name of window operators in DS by moving details 
to description
 Key: FLINK-25074
 URL: https://issues.apache.org/jira/browse/FLINK-25074
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25076) Simplify name of SQL operators

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25076:
---

 Summary: Simplify name of SQL operators
 Key: FLINK-25076
 URL: https://issues.apache.org/jira/browse/FLINK-25076
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25073) Introduce Tree Mode description for job vertex

2021-11-26 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25073:

Summary: Introduce Tree Mode description for job vertex  (was: Introduce 
Tree Mode)

> Introduce Tree Mode description for job vertex
> --
>
> Key: FLINK-25073
> URL: https://issues.apache.org/jira/browse/FLINK-25073
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Wenlong Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25073) Introduce Tree Mode

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25073:
---

 Summary: Introduce Tree Mode
 Key: FLINK-25073
 URL: https://issues.apache.org/jira/browse/FLINK-25073
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25072) Introduce description for operator

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25072:
---

 Summary: Introduce description for operator
 Key: FLINK-25072
 URL: https://issues.apache.org/jira/browse/FLINK-25072
 Project: Flink
  Issue Type: Sub-task
Reporter: Wenlong Lyu






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25070) FLIP-195: Improve the name and structure of vertex and operator name for job

2021-11-26 Thread Wenlong Lyu (Jira)
Wenlong Lyu created FLINK-25070:
---

 Summary: FLIP-195: Improve the name and structure of vertex and 
operator name for job
 Key: FLINK-25070
 URL: https://issues.apache.org/jira/browse/FLINK-25070
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream, Runtime / Web Frontend, Table SQL / 
Runtime
Reporter: Wenlong Lyu


this is an umbrella issue tracking the improvement of operator/vertex names in 
flink



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (FLINK-25070) FLIP-195: Improve the name and structure of vertex and operator name for job

2021-11-26 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-25070:

Description: this is an umbrella issue tracking the improvement of 
operator/vertex names in flink: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-195%3A+Improve+the+name+and+structure+of+vertex+and+operator+name+for+job
  (was: this is an umbrella issue tracking the improvement of operator/vertex 
names in flink)

> FLIP-195: Improve the name and structure of vertex and operator name for job
> 
>
> Key: FLINK-25070
> URL: https://issues.apache.org/jira/browse/FLINK-25070
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Web Frontend, Table SQL / 
> Runtime
>Reporter: Wenlong Lyu
>Priority: Major
>
> this is an umbrella issue tracking the improvement of operator/vertex names 
> in flink: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-195%3A+Improve+the+name+and+structure+of+vertex+and+operator+name+for+job



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-22 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447367#comment-17447367
 ] 

Wenlong Lyu commented on FLINK-20370:
-

hi, [~lincoln.86xy], what do you think we should provide when the sink is a 
DataStreamSinkProvider/TransformationSinkProvider? In such cases, we have no 
idea about the parallelism of sink, I think this is the case missed in the 
summary.

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24975) Add hooks and extension points to FlinkSQL

2021-11-21 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17447194#comment-17447194
 ] 

Wenlong Lyu commented on FLINK-24975:
-

hi, [~dahaishuantuoba], it is an interesting idea. Do you have any actual use 
cases depending on this feature?

> Add hooks and extension points to FlinkSQL
> --
>
> Key: FLINK-24975
> URL: https://issues.apache.org/jira/browse/FLINK-24975
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Reporter: junbiao chen
>Priority: Major
>
> refer to sparkSQL,https://issues.apache.org/jira/browse/SPARK-18127



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24870) Cannot cast "java.util.Date" to "java.time.Instant"

2021-11-11 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17442191#comment-17442191
 ] 

Wenlong Lyu commented on FLINK-24870:
-

[~wangbaohua] default external type for TIMESTAMP_LTZ is Instant instead of 
Date, you can try the  convert createTime to an Instant at datastream

> Cannot cast "java.util.Date" to "java.time.Instant"
> ---
>
> Key: FLINK-24870
> URL: https://issues.apache.org/jira/browse/FLINK-24870
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.13.1
>Reporter: wangbaohua
>Priority: Blocker
>
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76)
>         at 
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
>         ... 11 more
> Caused by: 
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74)
>         ... 12 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program 
> cannot be compiled. This is a bug. Please file an issue.
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89)
>         at 
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>         at 
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>         ... 15 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 120, Column 
> 101: Cannot cast "java.util.Date" to "java.time.Instant"
>         at 
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
>         at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
>         at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
>         at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>         at 
> org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
>         at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
>         at 
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
>         at 
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
>         at 
> org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
>         at 
> org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
>         at 
> 

[jira] [Commented] (FLINK-24861) Flink MySQL Look cache update for empty hit

2021-11-11 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17442186#comment-17442186
 ] 

Wenlong Lyu commented on FLINK-24861:
-

hi, [~gaurav726] I have a different point: it is ok to cache the empty key by 
default. The cache is usually used to avoid too frequent access on database, 
empty result query should also be in the scope.  
I also agrees that we can exclude empty result in case to improve the join 
result when the dim table is increasing by time but not updating, so I think 
you could add an optional config to exclude empty query result in cache 
instead. what do you think?

> Flink MySQL Look cache update for empty hit
> ---
>
> Key: FLINK-24861
> URL: https://issues.apache.org/jira/browse/FLINK-24861
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / JDBC
>Affects Versions: 1.14.0
>Reporter: Gaurav Miglani
>Assignee: Gaurav Miglani
>Priority: Major
>  Labels: pull-request-available
>
> Ideally, in case of cache miss for a key, or with null value fetch for key, 
> key shouldn't be cached



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24847) Decide the overflows behaviour

2021-11-09 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441198#comment-17441198
 ] 

Wenlong Lyu commented on FLINK-24847:
-

hi, have you try the latest code to see if the inconsistent still exists? the 
solution of FLINK-24318 may have changed something, which tried to fix the 
inconsistent by unify the expression executor.

> Decide the overflows behaviour
> --
>
> Key: FLINK-24847
> URL: https://issues.apache.org/jira/browse/FLINK-24847
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Francesco Guardiani
>Priority: Major
>
> Right now we have inconsistent behavior when it comes down to overflows, 
> depending on whether the value comes from a literal or from a value generated 
> by the runtime (eg after a sum).
> In particular, I tracked down an issue when trying to execute 
> {{CAST(9.2345682E9):INTEGER}} which returns {{644633299}} instead of 
> {{2147483647}} (the result of {{(int)9234567891.12f}}, because Calcite 
> changes the type of the literal to INTEGER, skipping completely our casting 
> logic in codegen and just forcing us to generate a literal using 
> {{literal.getValue().intValue()}} (Note that Calcite uses {{BigDecimal}} for 
> every numeric, no matter the type).
> Relevant code related to the issue:
> * {{RexBuilder#makeCast}}
> * {{GenerateUtils#generateLiteral}}
> This issue brings me to the following questions:
> * Should we throw an error on overflows?
> ** If yes, should this be the default behavior or just something we configure 
> behind a flag? 
> ** If no, should we have consistent and useful results when overflows (e.g. 
> max value)?
> *** If yes, what should be those overflow values? 
> *** If no, do we keep everything as it is and document that the user needs to 
> be careful about overflows? 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-24843) DynamicTableFactory.Context.getCatalogTable().getPartitionKeys() should return indexes

2021-11-09 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17441143#comment-17441143
 ] 

Wenlong Lyu commented on FLINK-24843:
-

hi, I'm afraid that return index may not friendly at api, because people need 
to understand which schema the index refers to, the PhysicalSchema from source 
or the ResolvedSchema from flink catalog? the case can be worse when the fields 
order at flink ddl is different from what it is at the external storage. 
Regarding primary key, we also use names currently, see UniqueConstraint

> DynamicTableFactory.Context.getCatalogTable().getPartitionKeys() should 
> return indexes
> --
>
> Key: FLINK-24843
> URL: https://issues.apache.org/jira/browse/FLINK-24843
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Francesco Guardiani
>Priority: Minor
>
> Right now invoking {{context.getCatalogTable().getPartitionKeys()}} returns 
> field names. We should encourage users to use indexes,  by having a new 
> method in context or by adding a new method to CatalogTable that returns 
> {{int[]}} and can be used in conjunction with {{DataType.excludeFields}}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-07 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17440153#comment-17440153
 ] 

Wenlong Lyu commented on FLINK-20370:
-

[~twalthr] thanks for the explanation. 
However, I think when the sink is kafka with pk(I would assume that it is a 
upsert kafka with key format provided), the target Kafka partition of a record 
is decided by the key generated. event it is written at different subtask. 
In such case, I think adding a key-by can help keep the order of record with 
the same uid, only when the input is already partitioned by uid. Is this case 
is what you want to solve?  

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-20370) Result is wrong when sink primary key is not the same with query

2021-11-05 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17439220#comment-17439220
 ] 

Wenlong Lyu commented on FLINK-20370:
-

hi, [~twalthr], could you explain more about case 2, what should we guarantee 
semantically in this case? If the input of sink actually has the samepk 
with sink, and is insert-only, I think there is no distribution disorder. If 
the input of sink doesn't have the same pk with sink, and is insert-only, I 
have no idea what guarantee can be provided by adding a key-by?

> Result is wrong when sink primary key is not the same with query
> 
>
> Key: FLINK-20370
> URL: https://issues.apache.org/jira/browse/FLINK-20370
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: Jark Wu
>Assignee: lincoln lee
>Priority: Critical
> Fix For: 1.15.0
>
>
> Both sources are upsert-kafka which synchronizes the changes from MySQL 
> tables (source_city, source_customer). The sink is another MySQL table which 
> is in upsert mode with "city_name" primary key. The join key is "city_id". 
> In this case, the result will be wrong when updating 
> {{source_city.city_name}} column in MySQL, as the UPDATE_BEFORE is ignored 
> and the old city_name is retained in the sink table. 
> {code}
> Sink(table=[default_catalog.default_database.sink_kafka_count_city], 
> fields=[city_name, count_customer, sum_gender], changelogMode=[NONE])
> +- Calc(select=[city_name, CAST(count_customer) AS count_customer, 
> CAST(sum_gender) AS sum_gender], changelogMode=[I,UA,D])
>+- Join(joinType=[InnerJoin], where=[=(city_id, id)], select=[city_id, 
> count_customer, sum_gender, id, city_name], 
> leftInputSpec=[JoinKeyContainsUniqueKey], 
> rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
>   :- Exchange(distribution=[hash[city_id]], changelogMode=[I,UA,D])
>   :  +- GlobalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(count1$0) AS count_customer, SUM_RETRACT((sum$1, count$2)) AS 
> sum_gender], changelogMode=[I,UA,D])
>   : +- Exchange(distribution=[hash[city_id]], changelogMode=[I])
>   :+- LocalGroupAggregate(groupBy=[city_id], select=[city_id, 
> COUNT_RETRACT(*) AS count1$0, SUM_RETRACT(gender) AS (sum$1, count$2)], 
> changelogMode=[I])
>   :   +- Calc(select=[city_id, gender], changelogMode=[I,UB,UA,D])
>   :  +- ChangelogNormalize(key=[customer_id], 
> changelogMode=[I,UB,UA,D])
>   : +- Exchange(distribution=[hash[customer_id]], 
> changelogMode=[UA,D])
>   :+- MiniBatchAssigner(interval=[3000ms], 
> mode=[ProcTime], changelogMode=[UA,D])
>   :   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_customer]], fields=[customer_id, city_id, age, 
> gender, update_time], changelogMode=[UA,D])
>   +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
>  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
> +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
>+- MiniBatchAssigner(interval=[3000ms], mode=[ProcTime], 
> changelogMode=[UA,D])
>   +- TableSourceScan(table=[[default_catalog, 
> default_database, source_city]], fields=[id, city_name], changelogMode=[UA,D])
> {code}
> We have suggested users to use the same key of the query as the primary key 
> on sink in the documentation: 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication.
>  We should make this attention to be more highlight in CREATE TABLE page. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24716) Non-equality predicates on partition columns lead to incorrect plans

2021-11-01 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17437103#comment-17437103
 ] 

Wenlong Lyu commented on FLINK-24716:
-

the filter is pruned, not lost. I am also +1 for not keeping the filter,because 
it is duplicated. we can event optimize the scan to an empty values source, 
because no partition would be scanned.

> Non-equality predicates on partition columns lead to incorrect plans
> 
>
> Key: FLINK-24716
> URL: https://issues.apache.org/jira/browse/FLINK-24716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Queries such as
> {code}
> SELECT d FROM T1 WHERE c = 100 AND d > '2012'
> {code}
> where {{d}} is a partition column leads to incorrect plans:
> {code}
> == Abstract Syntax Tree ==
> LogicalProject(d=[$2])
> +- LogicalFilter(condition=[AND(=($0, 100), >($2, _UTF-16LE'2012'))])
>+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
> == Optimized Physical Plan ==
> Calc(select=[d], where=[=(c, 100)])
> +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], 
> partitions=[], project=[c, d]]], fields=[c, d])
> == Optimized Execution Plan ==
> Calc(select=[d], where=[(c = 100)])
> +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], 
> partitions=[], project=[c, d]]], fields=[c, d])
> {code}
> It seems in many cases (with SupportsFilterPushDown and without) the {{<}} 
> predicate is swallowed and not part of the final execution plan anymore.
> Reproducible code can be found 
> [here|https://github.com/twalthr/flink/blob/e5a2cc9bcc9b38cf2b94c9ea7c7296ce94434343/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/TestClass.java]
>  with new testing infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24716) Non-equality predicates on partition columns lead to incorrect plans

2021-11-01 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17436901#comment-17436901
 ] 

Wenlong Lyu commented on FLINK-24716:
-

I think the plan is correct, the swallowed predicate has been reflected in the 
empty remaining partition: partitions=[]

> Non-equality predicates on partition columns lead to incorrect plans
> 
>
> Key: FLINK-24716
> URL: https://issues.apache.org/jira/browse/FLINK-24716
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Timo Walther
>Priority: Major
>
> Queries such as
> {code}
> SELECT d FROM T1 WHERE c = 100 AND d > '2012'
> {code}
> where {{d}} is a partition column leads to incorrect plans:
> {code}
> == Abstract Syntax Tree ==
> LogicalProject(d=[$2])
> +- LogicalFilter(condition=[AND(=($0, 100), >($2, _UTF-16LE'2012'))])
>+- LogicalTableScan(table=[[default_catalog, default_database, T1]])
> == Optimized Physical Plan ==
> Calc(select=[d], where=[=(c, 100)])
> +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], 
> partitions=[], project=[c, d]]], fields=[c, d])
> == Optimized Execution Plan ==
> Calc(select=[d], where=[(c = 100)])
> +- TableSourceScan(table=[[default_catalog, default_database, T1, filter=[], 
> partitions=[], project=[c, d]]], fields=[c, d])
> {code}
> It seems in many cases (with SupportsFilterPushDown and without) the {{<}} 
> predicate is swallowed and not part of the final execution plan anymore.
> Reproducible code can be found 
> [here|https://github.com/twalthr/flink/blob/e5a2cc9bcc9b38cf2b94c9ea7c7296ce94434343/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/TestClass.java]
>  with new testing infrastructure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24501) Unexpected behavior of cumulate window aggregate for late event after recover from sp/cp

2021-10-28 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435175#comment-17435175
 ] 

Wenlong Lyu commented on FLINK-24501:
-

[~qingru zhang] in you case,if the watermark can be restore,the late event is 
still late event when restoring from sp/cp,because its event time is earlier 
than watermark restored. 
On the other hand, introducing per-key watermark state could increase the state 
size of window operator in long run and affects performance, because the 
progress state should never be clean up in order to recognize later event in 
restore in the future. such side effect could be worse when the window size is 
small because the space namespace will increase quickly.

> Unexpected behavior of cumulate window aggregate for late event after recover 
> from sp/cp
> 
>
> Key: FLINK-24501
> URL: https://issues.apache.org/jira/browse/FLINK-24501
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Major
>  Labels: pull-request-available
>
> *Problem description*
> After recover from savepoint or checkpoint, unexpected behavior of cumulate 
> window aggregate for late event may happened.
> *Bug analyze*
> Currently, for cumulate window aggregate, late events belongs to the cleaned 
> slice would be merged into the merged window state, and would be counted into 
> the later slice.
> For example, for a CUMULATE window, step is 1 minute, size is 1 day.
> {code:java}
> SELECT window_start, window_end, COUNT(USER_ID)
>   FROM TABLE(
> CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL 
> '1' DAY))
>   GROUP BY window_start, window_end;{code}
> When the watermark already comes to 11:01, result of window [00:00, 11:01) 
> would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
> Then if a late record which event time is 11:00 comes, it would be merged 
> into merged state, and would be counted into the later slice, for example, 
> for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result 
> INSERT (00:00, 11:01, 4) would not be retracted and updated.
> The behavior would be different if the job recover from savepoint/checkpoint.
> Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 
> 4).
> Then recover the job from savepoint. Watermarks are not checkpointed and they 
> need to be repopulated again. So after recovered, the watermark may rollback 
> to 11:00, then if a record which event time is 11:00 comes, it would not be 
> processed as late event, after watermark comes to 11:01 again, a window 
> result INSERT (00:00, 11:01, 5)  would be emitted to downstream.
> So the downstream operator would receive two INSERT record for WINDOW (00:00, 
> 11:01) which may leads to wrong result.
>  
> *Solution*
> There are two solutions for the problem:
>  # save watermark to state in slice shared operator. (Prefered)
>  # update the behavior for late event. For example, retract the emitted 
> result and send the updated result. It needs to change the behavior of slice 
> state clean mechanism because we clean the slice state after watermark 
> exceeds the slice end currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24501) Unexpected behavior of cumulate window aggregate for late event after recover from sp/cp

2021-10-27 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17435137#comment-17435137
 ] 

Wenlong Lyu commented on FLINK-24501:
-

[~qingru zhang] I think watermark should be the min of all subtask, and nothing 
wrong would happen, because downstream operator such as window, also treat min 
of watermark of its upstream as its watermark, the logic is aligned.

> Unexpected behavior of cumulate window aggregate for late event after recover 
> from sp/cp
> 
>
> Key: FLINK-24501
> URL: https://issues.apache.org/jira/browse/FLINK-24501
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Major
>  Labels: pull-request-available
>
> *Problem description*
> After recover from savepoint or checkpoint, unexpected behavior of cumulate 
> window aggregate for late event may happened.
> *Bug analyze*
> Currently, for cumulate window aggregate, late events belongs to the cleaned 
> slice would be merged into the merged window state, and would be counted into 
> the later slice.
> For example, for a CUMULATE window, step is 1 minute, size is 1 day.
> {code:java}
> SELECT window_start, window_end, COUNT(USER_ID)
>   FROM TABLE(
> CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL 
> '1' DAY))
>   GROUP BY window_start, window_end;{code}
> When the watermark already comes to 11:01, result of window [00:00, 11:01) 
> would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
> Then if a late record which event time is 11:00 comes, it would be merged 
> into merged state, and would be counted into the later slice, for example, 
> for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result 
> INSERT (00:00, 11:01, 4) would not be retracted and updated.
> The behavior would be different if the job recover from savepoint/checkpoint.
> Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 
> 4).
> Then recover the job from savepoint. Watermarks are not checkpointed and they 
> need to be repopulated again. So after recovered, the watermark may rollback 
> to 11:00, then if a record which event time is 11:00 comes, it would not be 
> processed as late event, after watermark comes to 11:01 again, a window 
> result INSERT (00:00, 11:01, 5)  would be emitted to downstream.
> So the downstream operator would receive two INSERT record for WINDOW (00:00, 
> 11:01) which may leads to wrong result.
>  
> *Solution*
> There are two solutions for the problem:
>  # save watermark to state in slice shared operator. (Prefered)
>  # update the behavior for late event. For example, retract the emitted 
> result and send the updated result. It needs to change the behavior of slice 
> state clean mechanism because we clean the slice state after watermark 
> exceeds the slice end currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24501) Unexpected behavior of cumulate window aggregate for late event after recover from sp/cp

2021-10-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17432190#comment-17432190
 ] 

Wenlong Lyu commented on FLINK-24501:
-

I think it may be better to make sure that watermark would not reduce after 
restoring from a checkpoint/savepoint instead of modifying the manner of 
operator to cover such abnormal case. 
For example, add an operator state in watermark assigner, to avoid it producing 
wrong watermark after restore?

> Unexpected behavior of cumulate window aggregate for late event after recover 
> from sp/cp
> 
>
> Key: FLINK-24501
> URL: https://issues.apache.org/jira/browse/FLINK-24501
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Major
>  Labels: pull-request-available
>
> *Problem description*
> After recover from savepoint or checkpoint, unexpected behavior of cumulate 
> window aggregate for late event may happened.
> *Bug analyze*
> Currently, for cumulate window aggregate, late events belongs to the cleaned 
> slice would be merged into the merged window state, and would be counted into 
> the later slice.
> For example, for a CUMULATE window, step is 1 minute, size is 1 day.
> {code:java}
> SELECT window_start, window_end, COUNT(USER_ID)
>   FROM TABLE(
> CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL 
> '1' DAY))
>   GROUP BY window_start, window_end;{code}
> When the watermark already comes to 11:01, result of window [00:00, 11:01) 
> would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
> Then if a late record which event time is 11:00 comes, it would be merged 
> into merged state, and would be counted into the later slice, for example, 
> for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result 
> INSERT (00:00, 11:01, 4) would not be retracted and updated.
> The behavior would be different if the job recover from savepoint/checkpoint.
> Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 
> 4).
> Then recover the job from savepoint. Watermarks are not checkpointed and they 
> need to be repopulated again. So after recovered, the watermark may rollback 
> to 11:00, then if a record which event time is 11:00 comes, it would not be 
> processed as late event, after watermark comes to 11:01 again, a window 
> result INSERT (00:00, 11:01, 5)  would be emitted to downstream.
> So the downstream operator would receive two INSERT record for WINDOW (00:00, 
> 11:01) which may leads to wrong result.
>  
> *Solution*
> There are two solutions for the problem:
>  # save watermark to state in slice shared operator. (Prefered)
>  # update the behavior for late event. For example, retract the emitted 
> result and send the updated result. It needs to change the behavior of slice 
> state clean mechanism because we clean the slice state after watermark 
> exceeds the slice end currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-24412) retract stream join on topN error

2021-09-30 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-24412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17422603#comment-17422603
 ] 

Wenlong Lyu commented on FLINK-24412:
-

hi, [~sandyfog], Lookup join can not work well on retract/upsert stream if the 
source table could change. you can try mysql cdc 
source(https://github.com/ververica/flink-cdc-connectors/issues) and regular 
table join instead.

> retract  stream  join  on topN   error
> --
>
> Key: FLINK-24412
> URL: https://issues.apache.org/jira/browse/FLINK-24412
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.12.3
>Reporter: sandy du
>Priority: Critical
>
> I can  reappear this error in follow sql:
>  create table user_info(
>  name string,
>  age int,
>  primary key(name) not enforced
>  ) whith(
>  'connector'='jdbc',
>  'url'='jdbc:mysql...',
>  ...
>  'lookup.cache.max-rows'='0',
>  'lookup.cache.ttl'='1 s'
>  );
> create table user_action(
>  name string,
>  app string,
>  dt string,
>  proctime as proctime()
>  )whith(
>  'connector'='kafka',
>  ...
>  );
> create view v_user_action as select * from(
>  select name,app,proctime,row_number() over(partition by name,app order by dt 
> desc) as rn from user_action
>  )t where rn=1;
> create view user_out as select a.name,a.app,b.age from v_user_action a left 
> join user_info
>  for system_time as of a.proctime as b on a.name=b.name;
> select * from (
>  select name,app,age ,row_number() over(partition by name,app order by age 
> desc) as rn from user_out
>  ) t where rn=1;
>   
>  *first :*
>  {color:#de350b} user_action  got data  
> \{"name":"11","app":"app","dt":"2021-09-10"}{color}
> {color:#de350b}user_info   got data  \{"name":"11","age":11}{color}
> at the moment  sql can  successful run.
> *{color:#de350b}then :{color}*
> {color:#de350b}user_action  got data  
> \{"name":"11","app":"app","dt":"2021-09-20"}{color}
> {color:#de350b}user_info   got data  \{"name":"11","age":11}  
> \{"name":"11","age":22} {color}
> now, TopN query on last sql, the TopN operator will thrown exception: 
> {{Caused by: java.lang.RuntimeException: Can not retract a non-existent 
> record. This should never happen.}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23353) UDTAGG can't execute in Batch mode

2021-07-16 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17382014#comment-17382014
 ] 

Wenlong Lyu commented on FLINK-23353:
-

hi, [~hayden zhou], Table Aggregate only supported in streaming mode by 
designed, as you can see in FLINK-13471 / FLIP-29 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739)

> UDTAGG can't execute in Batch mode
> --
>
> Key: FLINK-23353
> URL: https://issues.apache.org/jira/browse/FLINK-23353
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.1
>Reporter: hayden zhou
>Priority: Major
>
> {code:java}
> public class Top2Test {
> public static void main(String[] args) {
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode()build();
> TableEnvironment tEnv = TableEnvironment.create(settings);
> Table sourceTable = tEnv.fromValues(
> DataTypes.ROW(
> DataTypes.FIELD("id", DataTypes.INT()),
> DataTypes.FIELD("name",DataTypes.STRING()),
> DataTypes.FIELD("price", DataTypes.INT())
> ),
> row(1, "hayden", 18),
> row(3, "hayden", 19),
> row(4, "hayden", 20),
> row(2, "jaylin", 20)
> );
> tEnv.createTemporaryView("source", sourceTable);
> Table rT = tEnv.from("source")
> .groupBy($("name"))
> .flatAggregate(call(Top2.class, $("price")).as("price", 
> "rank"))
> .select($("name"), $("price"), $("rank"));
> rT.execute().print();
> }
> public static class Top2Accumulator {
> public Integer first;
> public Integer second;
> }
> public static class Top2 extends TableAggregateFunction Integer>, Top2Accumulator> {
> @Override
> public Top2Accumulator createAccumulator() {
> Top2Accumulator acc = new Top2Accumulator();
> acc.first = Integer.MIN_VALUE;
> acc.second = Integer.MIN_VALUE;
> return acc;
> }
> public void accumulate(Top2Accumulator acc, Integer value) {
> if (value > acc.first) {
> acc.second = acc.first;
> acc.first = value;
> } else if (value > acc.second) {
> acc.second = value;
> }
> }
> public void merge(Top2Accumulator acc, Iterable it) {
> for (Top2Accumulator otherAcc : it) {
> accumulate(acc, otherAcc.first);
> accumulate(acc, otherAcc.second);
> }
> }
> public void emitValue(Top2Accumulator acc, Collector Integer>> out) {
> if (acc.first != Integer.MIN_VALUE) {
> out.collect(Tuple2.of(acc.first, 1));
> }
> if (acc.second != Integer.MIN_VALUE) {
> out.collect(Tuple2.of(acc.second, 2));
> }
> }
> }
> }
> {code}
> got errors as below:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
> generate a valid execution plan for the given query: 
> LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
>  fields=[name, price, rank])
> +- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
> _UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
>+- LogicalTableAggregate(group=[{1}], 
> tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
>   +- LogicalUnion(all=[true])
>  :- LogicalProject(id=[CAST(1):INTEGER], 
> name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(18):INTEGER])
>  :  +- LogicalValues(tuples=[[{ 0 }]])
>  :- LogicalProject(id=[CAST(3):INTEGER], 
> name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(19):INTEGER])
>  :  +- LogicalValues(tuples=[[{ 0 }]])
>  :- LogicalProject(id=[CAST(4):INTEGER], 
> name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(20):INTEGER])
>  :  +- LogicalValues(tuples=[[{ 0 }]])
>  +- LogicalProject(id=[CAST(2):INTEGER], 
> name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
> "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
> price=[CAST(20):INTEGER])
> +- LogicalValues(tuples=[[{ 0 }]])
> This exception indicates that the query uses an unsupported SQL feature.
> Please check the documentation for the set of currently supported SQL 
> 

[jira] [Commented] (FLINK-23385) Fix nullability of COALESCE

2021-07-16 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17382009#comment-17382009
 ] 

Wenlong Lyu commented on FLINK-23385:
-

hi, [~twalthr] I think the root cause of the issue is the return type of 
REGEXP_EXTRACT should be force nullable instead of  depending on input type, 
introduce by FLINK-13783

> Fix nullability of COALESCE
> ---
>
> Key: FLINK-23385
> URL: https://issues.apache.org/jira/browse/FLINK-23385
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.1
>Reporter: Maciej Bryński
>Priority: Major
>
> EDIT: Simpler case:
> {code:java}
> SELECT COALESCE(REGEXP_EXTRACT('22','[A-Z]+'),'-');
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, 
> however, a null value is being written into it. You can set job configuration 
> 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and 
> drop such records silently.
> {code}
> When using REGEXP_EXTRACT on NOT NULL column I'm getting following exception
> {code:java}
> select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from test limit 10
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, 
> however, a null value is being written into it. You can set job configuration 
> 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and 
> drop such records silently.
> {code}
> I think the reason is that nullability of result is wrongly calculated.
>  Example:
> {code:java}
> create table test (
>  test STRING NOT NULL
> ) WITH (
> 'connector' = 'datagen'
> );
> explain select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from test
> == Abstract Syntax Tree ==
> LogicalProject(EXPR$0=[REGEXP_EXTRACT($0, _UTF-16LE'[A-Z]+')])
> +- LogicalTableScan(table=[[default_catalog, default_database, test]])== 
> Optimized Physical Plan ==
> Calc(select=[REGEXP_EXTRACT(test, _UTF-16LE'[A-Z]+') AS EXPR$0])
> +- TableSourceScan(table=[[default_catalog, default_database, test]], 
> fields=[test])== Optimized Execution Plan ==
> Calc(select=[REGEXP_EXTRACT(test, _UTF-16LE'[A-Z]+') AS EXPR$0])
> +- TableSourceScan(table=[[default_catalog, default_database, test]], 
> fields=[test]){code}
> As you can see Flink is removing COALESCE from query which is wrong.
>  
> Same for view (null = false):
> {code:java}
> create view v as select COALESCE(REGEXP_EXTRACT(test, '[A-Z]+'), '-') from 
> test
> describe v;
> +++---+-++---+
> |   name |   type |  null | key | extras | watermark |
> +++---+-++---+
> | EXPR$0 | STRING | false | ||   |
> +++---+-++---+
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-23159) Correlated sql subquery on the source created via fromValues() failed to compile

2021-07-02 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-23159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17373308#comment-17373308
 ] 

Wenlong Lyu commented on FLINK-23159:
-

[~gaoyunhaii] Thanks for reporting the issue, the root cause of this issue is 
that currently Values is excluded in SubqueryDecorrelator, I would try to fix 
it.

> Correlated sql subquery on the source created via fromValues() failed to 
> compile
> 
>
> Key: FLINK-23159
> URL: https://issues.apache.org/jira/browse/FLINK-23159
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Yun Gao
>Priority: Major
>
> Correlated subquery like 
> {code:java}
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.types.DataType;
> import org.apache.flink.types.Row;
> import java.util.ArrayList;
> import java.util.List;
> public class SQLQueryTest {
>   public static void main(String[] args) {
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode()
>   .build();
> TableEnvironment tableEnvironment = TableEnvironment.create(settings);
> DataType row = DataTypes.ROW(
>   DataTypes.FIELD("flag", DataTypes.STRING()),
>   DataTypes.FIELD("id", DataTypes.INT()),
>   DataTypes.FIELD("name", DataTypes.STRING())
> );
> Table table = tableEnvironment.fromValues(row, new 
> MyListSource("table1").builder());
> tableEnvironment.createTemporaryView("table1", table);
> table = tableEnvironment.fromValues(row, new 
> MyListSource("table2").builder());
> tableEnvironment.createTemporaryView("table2", table);
> String sql = "select t1.flag from table1 t1 where t1.name in (select 
> t2.name from table2 t2 where t2.id = t1.id)";
> tableEnvironment.explainSql(sql);
>   }
>   public static class MyListSource {
> private String flag;
> public MyListSource(String flag) {
>   this.flag = flag;
> }
> public List builder() {
>   List rows = new ArrayList<>();
>   for (int i = 2; i < 3; i++) {
> Row row = new Row(3);
> row.setField(0, flag);
> row.setField(1, i);
> row.setField(2, "me");
> rows.add(row);
>   }
>   return rows;
> }
>   }
> }
> {code}
> would throws
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> unexpected correlate variable $cor0 in the plan
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>   at scala.collection.immutable.Range.foreach(Range.scala:160)
>   at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>   at 
> 

[jira] [Commented] (FLINK-22954) Don't support consuming update and delete changes when use table function that does not contain table field

2021-06-18 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365301#comment-17365301
 ] 

Wenlong Lyu commented on FLINK-22954:
-

[~godfreyhe]  thanks for correct me, you are right. so when the function is not 
deterministic, it is still not supported now?  I have tried to add the rewrite 
rule, with this pr planner can work well on join constant table function: 
https://github.com/apache/flink/pull/16192

I am thinking about that maybe we should remove the limitation of 
RexUtil.isConstant on ConstantTableFunctionScanRule, because 
ConstantTableFunctionScanRule only works on: select * from lateral table(XXX), 
in this case it is ok to support non-deterministic function. what do you think

> Don't support consuming update and delete changes when use table function 
> that does not contain table field
> ---
>
> Key: FLINK-22954
> URL: https://issues.apache.org/jira/browse/FLINK-22954
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: hehuiyuan
>Priority: Major
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Table 
> sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
> consuming update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
> "main" org.apache.flink.table.api.TableException: Table sink 
> 'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
> update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> 

[jira] [Commented] (FLINK-22954) Don't support consuming update and delete changes when use table function that does not contain table field

2021-06-17 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365238#comment-17365238
 ] 

Wenlong Lyu commented on FLINK-22954:
-

[~jark][~godfreyhe], I think ConstantTableFunctionScanRule should not the right 
way to  process the ConstantTableFunction, when the function is not 
deterministic, the function should be called by every record,  How about adding 
a new rule to convert LogicalJoin(XX, TableFunctionScan) to 
LogicalCorrelate(XXX, TableFunctionScan) ?

> Don't support consuming update and delete changes when use table function 
> that does not contain table field
> ---
>
> Key: FLINK-22954
> URL: https://issues.apache.org/jira/browse/FLINK-22954
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: hehuiyuan
>Priority: Major
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Table 
> sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
> consuming update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
> "main" org.apache.flink.table.api.TableException: Table sink 
> 'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
> update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> 

[jira] [Comment Edited] (FLINK-22954) Don't support consuming update and delete changes when use table function that does not contain table field

2021-06-17 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17365238#comment-17365238
 ] 

Wenlong Lyu edited comment on FLINK-22954 at 6/18/21, 4:37 AM:
---

[~jark][~godfreyhe], I think ConstantTableFunctionScanRule is not the right way 
to  process the ConstantTableFunction, when the function is not deterministic, 
the function should be called by every record,  How about adding a new rule to 
convert LogicalJoin(XX, TableFunctionScan) to LogicalCorrelate(XXX, 
TableFunctionScan) ?


was (Author: wenlong.lwl):
[~jark][~godfreyhe], I think ConstantTableFunctionScanRule should not the right 
way to  process the ConstantTableFunction, when the function is not 
deterministic, the function should be called by every record,  How about adding 
a new rule to convert LogicalJoin(XX, TableFunctionScan) to 
LogicalCorrelate(XXX, TableFunctionScan) ?

> Don't support consuming update and delete changes when use table function 
> that does not contain table field
> ---
>
> Key: FLINK-22954
> URL: https://issues.apache.org/jira/browse/FLINK-22954
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.0
>Reporter: hehuiyuan
>Priority: Major
>
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Table 
> sink 'default_catalog.default_database.kafkaTableSink' doesn't support 
> consuming update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])Exception in thread 
> "main" org.apache.flink.table.api.TableException: Table sink 
> 'default_catalog.default_database.kafkaTableSink' doesn't support consuming 
> update and delete changes which is produced by node 
> Join(joinType=[LeftOuterJoin], where=[true], select=[name, word], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:382)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:265)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:279)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:341)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:330)
>  at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:329)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.Range.foreach(Range.scala:160) at 
> 

[jira] [Updated] (FLINK-22998) Flink SQL does not support block comment before SET command

2021-06-17 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-22998:

Component/s: (was: Table SQL / Planner)
 Table SQL / API

> Flink SQL does not support block comment before SET command
> ---
>
> Key: FLINK-22998
> URL: https://issues.apache.org/jira/browse/FLINK-22998
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.14.0
>Reporter: Zhiwen Sun
>Priority: Major
>
> Flink SQL does not support block comment before SET command.
> A tiny SQL that produces the bug:
>  
> {code:java}
> /**
>  comment
> **/
> SET sql-client.execution.result-mode=TABLEAU;
> SELECT 'hello';{code}
>  
> while following SQL works fine:
>  
> {code:java}
> SET sql-client.execution.result-mode=TABLEAU;
> /**
>  comment
> **/
> SELECT 'hello';{code}
>  
>  
> After I debug Flink source code, I found that EXTENDED_PARSER does not 
> support block comment.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22714) Simplify `StreamPhysicalWindowTableFunction` to a simple window assigner if successor node with `WindowAttachedWindowingStrategy` in planner

2021-06-07 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17358430#comment-17358430
 ] 

Wenlong Lyu commented on FLINK-22714:
-

hi, [~qingru zhang], is this issue depends on other feature not merged? 
Currently the StreamExecPhysicalWindowTableFunction is an intermediate node in 
plan optimization(as we can see, StreamExecWindowTableFunction cannot translate 
to Transformation), I don't see the reason why we need the tag of emitPerRecord 
on StreamPhysicalWindowTableFunction.

> Simplify `StreamPhysicalWindowTableFunction` to a simple window assigner if 
> successor node with `WindowAttachedWindowingStrategy`  in planner
> -
>
> Key: FLINK-22714
> URL: https://issues.apache.org/jira/browse/FLINK-22714
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.14.0
>Reporter: JING ZHANG
>Assignee: JING ZHANG
>Priority: Major
>  Labels: pull-request-available
>
> In theory, `StreamPhysicalWindowTableFunction` should works as a window 
> assigner and emits at ends of window. However, 
> `StreamPhysicalWindowTableFunction` could be simplified to a simple window 
> assigner, and emits per record if it's successor node is one of 
> `WindowRank`/`WindowJoin`/`WindowAggregate` with 
> `WindowAttachedWindowingStrategy`, because those nodes would assign input 
> records to windows have been assigned in input nodes, and trigger emit at 
> ends of window.
> Besides, we should add a limit that window is based on rowtime, because it 
> has syntax problem if window is assigned based on the proc-time of previous 
> operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7271) ExpressionReducer does not optimize string-to-time conversion

2021-05-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348947#comment-17348947
 ] 

Wenlong Lyu commented on FLINK-7271:


hi, [~twalthr] [~xueyu] I investigate a bit on the case, the case still exists, 
found that it is because we have different presentation on some types like 
Date/Timestamp/Row etc, comparing with calcite, and the reduced value produced 
by ExpressionReducer was casted to string and so that we can make a rex node 
the same as the un-reduced value:

cast(String as Date) -- (ExpressionReducer) -->  Date (internal format Int) -- 
auto cast in RexBuilder#makeLiteral --> cast(String as Date)

> ExpressionReducer does not optimize string-to-time conversion
> -
>
> Key: FLINK-7271
> URL: https://issues.apache.org/jira/browse/FLINK-7271
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.3.1
>Reporter: Timo Walther
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> Expressions like {{"1996-11-10".toDate}} or {{"1996-11-10 
> 12:12:12".toTimestamp}} are not recognized by the ExpressionReducer and are 
> evaluated during runtime instead of pre-flight phase. In order to optimize 
> the runtime we should allow constant expression reduction here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22437) Miss adding parallesim for filter operator in batch mode

2021-05-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348082#comment-17348082
 ] 

Wenlong Lyu commented on FLINK-22437:
-

[~zoucao] thanks for reporting the bug, the bug is already fixed in 1.13

> Miss adding parallesim for filter operator in batch mode
> 
>
> Key: FLINK-22437
> URL: https://issues.apache.org/jira/browse/FLINK-22437
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.12.2
>Reporter: zoucao
>Priority: Major
>
> when I execute batch sql as follow in flink-1.12.2, I found lots of small 
> files in hdfs. In filesystem connector, `GroupedPartitionWriter` will be 
> used, and it close the last partiton if a new record does not belong to the 
> existing partition. The phenomenon occurred if there are more than one 
> partiton's records are sent to filesystem sink at the same time. Hive source 
> can determine parallesim by the number of file and partition, and the 
> parallesim will extended by sort operator,  but in 
> `CommonPhysicalSink#createSinkTransformation`,a filter operator will be add 
> to support `SinkNotNullEnforcer`, there is no parallesim set for it, so 
> filesystem sink operator can not get the correct parallesim from inputstream.
> {code:java}
> CREATE CATALOG myHive with (
> 'type'='hive',
> 'property-version'='1',
> 'default-database' = 'flink_sql_online_test'
> );
> -- SET table.sql-dialect=hive;
> -- CREATE TABLE IF NOT EXISTS myHive.flink_sql_online_test.hive_sink (
> --`timestamp` BIGINT,
> --`time` STRING,
> --id BIGINT,
> --product STRING,
> --price DOUBLE,
> --canSell STRING,
> --selledNum BIGINT
> -- ) PARTITIONED BY (
> --dt STRING,
> --`hour` STRING,
> --   `min` STRING
> -- ) TBLPROPERTIES (
> --'partition.time-extractor.timestamp-pattern'='$dt $hr:$min:00',
> --'sink.partition-commit.trigger'='partition-time',
> --'sink.partition-commit.delay'='1 min',
> --'sink.partition-commit.policy.kind'='metastore,success-file'
> -- );
> create table fs_sink (
> `timestamp` BIGINT,
> `time` STRING,
> id BIGINT,
> product STRING,
> price DOUBLE,
> canSell STRING,
> selledNum BIGINT,
> dt STRING,
> `hour` STRING,
> `min` STRING
> ) PARTITIONED BY (dt, `hour`, `min`) with (
> 'connector'='filesystem',
> 'path'='hdfs://',
> 'format'='csv'
> );
> insert into fs_sink
> select * from myHive.flink_sql_online_test.hive_sink;
> {code}
> I think this problem can be fixed by adding a parallesim for it just like
> {code:java}
> val dataStream = new DataStream(env, inputTransformation).filter(enforcer)
>   .setParallelism(inputTransformation.getParallelism)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22454) Failed to translate Lookup Join when join on a CAST expression on dimention table column

2021-05-20 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17348075#comment-17348075
 ] 

Wenlong Lyu commented on FLINK-22454:
-

Hi, [~fsk119], I think is error is by designed. Lookup join must have look up 
keys in join condition, we ignored casting on field ref while extracting lookup 
keys(see CommonPhysicalLookupJoin#getIdenticalSourceField), in order to support 
lookup keys which have interoperable types(see 
PlannerTypeUtils#isInteroperable), and added  a check on the type of lookup 
keys to make sure that the types is really interoperable. 

In your case, the cast can not be ignored(int and decimal are not 
interoperable), so planner throws error about type not compatible on lookup key.

> Failed to translate Lookup Join when join on a CAST expression on dimention 
> table column
> 
>
> Key: FLINK-22454
> URL: https://issues.apache.org/jira/browse/FLINK-22454
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Priority: Major
>
> Please add test in {{LookupJoinTest}}
> {code:java}
>  def before(): Unit ={
> util.addDataStream[(Int, String, Long)](
>   "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
> if (legacyTableSource) {
>   TestTemporalTable.createTemporaryTable(util.tableEnv, "LookupTable")
> } else {
>   util.addTable(
> """
>   |CREATE TABLE LookupTable (
>   |  `id` DECIMAL(38, 10),
>   |  `to_qty` DECIMAL(38, 10),
>   |  `name` STRING,
>   |  `age` INT,
>   |  `id_int` as CAST(`id` AS INT)
>   |) WITH (
>   |  'connector' = 'values'
>   |)
>   |""".stripMargin)
>   }
> {code}
> {code:java}
> @Test
>   def test(): Unit = {
> val sql =
> """
> |SELECT MyTable.b, LookupTable.`to_qty`
> |FROM MyTable
> |LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF MyTable.`proctime`
> |ON MyTable.a = CAST(LookupTable.`id` as INT)
> |""".stripMargin
> util.tableEnv.sqlQuery(sql).explain()
>   }
> {code}
> The exception stack is 
> {code}
> org.apache.flink.table.api.TableException: Temporal table join requires 
> equivalent condition of the same type, but the condition is 
> a[INT]=id[DECIMAL(38, 10)]
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.validateLookupKeyType(CommonExecLookupJoin.java:303)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.translateToPlanInternal(CommonExecLookupJoin.java:222)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:247)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:88)
>   at 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:134)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:70)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:69)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:69)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:46)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:691)
>   at 
> org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582)
>   at 
> org.apache.flink.table.planner.plan.stream.sql.join.LookupJoinTest.test(LookupJoinTest.scala:197)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Commented] (FLINK-22486) Wrong results of the IN operator

2021-05-11 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342973#comment-17342973
 ] 

Wenlong Lyu commented on FLINK-22486:
-

discussed with [~fsk119] offline, this issue is invalid, can be closed for now.

> Wrong results of the IN operator
> 
>
> Key: FLINK-22486
> URL: https://issues.apache.org/jira/browse/FLINK-22486
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Priority: Major
>
> Please add the following test in the {{CalcITCase}}.
> {code:java}
> @Test
>   def testSimpleProject(): Unit = {
> val myTableDataId = TestValuesTableFactory.registerData(Seq(row("HC809")))
> val ddl =
>   s"""
>  |CREATE TABLE SimpleTable (
>  |  content String
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$myTableDataId',
>  |  'bounded' = 'true'
>  |)
>  |""".stripMargin
> tEnv.executeSql(ddl)
> val sql =
>   """
> |SELECT content from SimpleTable where content in (
> |'CTNBSmokeSensor','H388N',
> |'H389N',
> |'GHL-IRD','JY-BF-20YN','HC809',
> |'DH-9908N-AEP','DH-9908N'
> |)
> |
> |""".stripMargin
> checkResult(
>   sql,
>   Seq(row("HC809"))
> )
>   }
> {code}
> It should return the result but nothing return



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22448) FlinkRexUtil create Sarg String array elemet supplement space

2021-05-11 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342972#comment-17342972
 ] 

Wenlong Lyu commented on FLINK-22448:
-

hi, [~Akihito Liang], as tested in Flink-22486, I think this issue didn't 
existed now, in both master and release-1.13, the result type of in would be 
correct:  varchar(max of the literal)

> FlinkRexUtil create Sarg String array elemet supplement space
> -
>
> Key: FLINK-22448
> URL: https://issues.apache.org/jira/browse/FLINK-22448
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Junning Liang
>Priority: Major
>
> As we know, the new version of Calcite introduces the {{SEARCH}} rex call to 
> express range conditions. But when i used string array to express range in 
> the StreamSQL, i found that some string in the array had problems with the 
> completion length by using space.
> the following query:
>  
> {code:java}
> create view tempView as
>   select * from sourceTable where action in ('systemnotifyv2', 
> 'session_auth', 'create_session', 'close_single_chat'){code}
> after Sarg operator created, the result is :
>  
>  
> {code:java}
> create view tempView as
>  select * from sourceTable where action in ('systemnotifyv2', 'session_auth  
> ', 'create_session', 'close_single_chat')
> {code}
> I debuged to see why dose the happans. After calling  rexBuilder.makeLiteral 
> in
> FlinkRexUtil#expandSearchOperands, the string 'session_auth' became 
> 'session_auth  '.And i also found that the type and length of the string 
> array were determined by the first string in the array.Just like my example 
> above, the type of the array was Char and the length of the array was 14.the 
> length of 'session_auth' string was 12 so that calcite would supplement  2 
> space to make it meet the length of 14.
> Now, i All I can think of is adding trim parameter to remove the space。do you 
> have a better way to fix or avoid the problem happens?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22486) Wrong results of the IN operator

2021-05-11 Thread Wenlong Lyu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17342613#comment-17342613
 ] 

Wenlong Lyu commented on FLINK-22486:
-

Hi, [~fsk119] I have tried the case you post on master and release-1.13 branch, 
but cannot reproduce the problem you met.

> Wrong results of the IN operator
> 
>
> Key: FLINK-22486
> URL: https://issues.apache.org/jira/browse/FLINK-22486
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: Shengkai Fang
>Priority: Major
>
> Please add the following test in the {{CalcITCase}}.
> {code:java}
> @Test
>   def testSimpleProject(): Unit = {
> val myTableDataId = TestValuesTableFactory.registerData(Seq(row("HC809")))
> val ddl =
>   s"""
>  |CREATE TABLE SimpleTable (
>  |  content String
>  |) WITH (
>  |  'connector' = 'values',
>  |  'data-id' = '$myTableDataId',
>  |  'bounded' = 'true'
>  |)
>  |""".stripMargin
> tEnv.executeSql(ddl)
> val sql =
>   """
> |SELECT content from SimpleTable where content in (
> |'CTNBSmokeSensor','H388N',
> |'H389N',
> |'GHL-IRD','JY-BF-20YN','HC809',
> |'DH-9908N-AEP','DH-9908N'
> |)
> |
> |""".stripMargin
> checkResult(
>   sql,
>   Seq(row("HC809"))
> )
>   }
> {code}
> It should return the result but nothing return



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-14763) flink sql cep parallelism error

2021-04-29 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-14763:

Component/s: (was: Table SQL / Planner)
 Table SQL / Runtime

> flink sql cep  parallelism error 
> -
>
> Key: FLINK-14763
> URL: https://issues.apache.org/jira/browse/FLINK-14763
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0
> Environment: flink on yarn 
> flink 1.10
> hadoop 3.0
> kafka 2.2.0
>Reporter: richt richt
>Priority: Major
>  Labels: auto-unassigned
>
> when i commit a cep sql with sql-client use parallelism large than 1 , it  
> print error as blow
> {code:java}
> //代码占位符
> java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, 
> endKeyGroup=15} does not contain key group 
> 16java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=0, 
> endKeyGroup=15} does not contain key group 16 at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161) at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
>  at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
>  at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
>  at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
>  at 
> org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
>  at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:216)
>  at 
> org.apache.flink.cep.operator.CepOperator.saveRegisterWatermarkTimer(CepOperator.java:285)
>  at 
> org.apache.flink.cep.operator.CepOperator.processElement(CepOperator.java:266)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:163)
>  at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:149)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:282)
>  at 
> org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:151)
>  at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:430)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:696) at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:521) at 
> java.lang.Thread.run(Thread.java:748)
> {code}
> it seems allocate some key to the wrong taskmanager 
>  
> the yaml is 
> {code:java}
> //代码占位符
> execution:
>   planner: blink
>   type: streaming
>   parallelism: 32
> 
> - name: Ticker
> type: source-table
> update-mode: append
> connector:
>   sink-partitioner: round-robin
>   sink-partitioner-class: 
> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
>   property-version: 1
>   type: kafka
>   version: universal
>   topic: test_part
>   startup-mode: earliest-offset
>   properties:
> - key: bootstrap.servers
>   value:  localhost:9092
> - key: group.id
>   value: testGroup
> format:
>   property-version: 1
>   type: json
>   derive-schema: true
> schema:
> - name: symbol
>   type: VARCHAR
> - name: rtime
>   type: TIMESTAMP
>   rowtime:
> timestamps:
>   type: from-field
>   from: rowtime
> watermarks:
>   type: periodic-bounded
>   delay: 2000
> - name: price
>   type: BIGINT
> - name: tax
>   type: BIGINT
> {code}
> and the query is from the demo:
> {code:java}
> SELECT *
> FROM Ticker
> MATCH_RECOGNIZE(
> PARTITION BY symbol
> ORDER BY rtime
> MEASURES
> C.price AS lastPrice
> ONE ROW PER MATCH
> AFTER MATCH SKIP PAST LAST ROW
> PATTERN (A B* C)
> DEFINE
> A AS A.price > 10,
> B AS B.price < 15,
> C AS C.price > 12
> )
> {code}
> the data :
> {code:java}
>symbol rtime price 
>   tax
>   ACME  2011-11-11T10:00
> 12 1
>   ACME   2011-11-11T10:00:02
> 19 1
>   ACME   2011-11-11T10:00:01
> 17 2
>   ACME   

[jira] [Updated] (FLINK-14848) BaseRowSerializer.toBinaryRow wrongly process null for non-compact decimal

2021-04-29 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-14848:

Component/s: (was: Table SQL / Planner)
 Table SQL / Runtime

> BaseRowSerializer.toBinaryRow wrongly process null for non-compact decimal
> ---
>
> Key: FLINK-14848
> URL: https://issues.apache.org/jira/browse/FLINK-14848
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.9.1
>Reporter: Zhenghua Gao
>Priority: Major
>  Labels: pull-request-available, stale-major
>
> Take non-compact decimal(precision > 18 null as an example, the writer not 
> only need to set null bits, but also need to assign bytes of variable-length 
> part for future update. The following test case can reproduce the problem.
>  
>  
> {code:java}
> @Test
> public void test() {
>Decimal dec = Decimal.fromBigDecimal(new BigDecimal(11), 38, 0);
>BinaryRow row1 = new BinaryRow(2);
>BinaryRowWriter writer = new BinaryRowWriter(row1);
>writer.writeDecimal(0, dec, 38);
>writer.writeDecimal(1, null, 38);
>writer.complete();
>BaseRowSerializer serializer = new BaseRowSerializer(null, RowType.of(new 
> DecimalType(38, 0), new DecimalType(38, 0)));
>GenericRow row2 = new GenericRow(2);
>row2.setField(0, dec);
>row2.setField(1, null);
>BinaryRow row3 = serializer.toBinaryRow(row2);
>assertTrue(row1.equalsWithoutHeader(row3));
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15040) Open function is not called in UDF AggregateFunction

2021-04-29 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-15040:

Component/s: (was: Table SQL / Planner)
 Table SQL / Runtime

> Open function is not called in UDF AggregateFunction
> 
>
> Key: FLINK-15040
> URL: https://issues.apache.org/jira/browse/FLINK-15040
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Reporter: LI Guobao
>Priority: Major
>  Labels: stale-major
>
> I am trying to register a metric in an aggregate UDF by overriding the *open* 
> function. According to the documentation, the *open* function can be override 
> in order to retrieve the metric group to do the metric registration. But it 
> works only on ScalarFunction not on AggregateFunction. Since the *open* 
> function is not call by AggregateFunction.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15183) Use SQL-CLI to TPC-DS E2E test

2021-04-29 Thread Wenlong Lyu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenlong Lyu updated FLINK-15183:

Component/s: (was: Table SQL / Planner)
 Table SQL / API

> Use SQL-CLI to TPC-DS E2E test
> --
>
> Key: FLINK-15183
> URL: https://issues.apache.org/jira/browse/FLINK-15183
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API, Tests
>Reporter: Jingsong Lee
>Priority: Major
> Fix For: 1.14.0
>
>
> Now SQL-CLI support DDL, we can use SQL-CLI to test tpc-ds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >