[jira] [Commented] (FLINK-28569) SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty otherwise wrong result maybe produced

2022-10-11 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615706#comment-17615706
 ] 

Terry Wang commented on FLINK-28569:


But for the query which don't have upsert key and with non-deterministic 
values, the potential wrong result seems can not be avoid?
Should we disable such plan or give a warning during plan generation?

> SinkUpsertMaterializer should be aware of the input upsertKey if it is not 
> empty otherwise wrong result maybe produced
> --
>
> Key: FLINK-28569
> URL: https://issues.apache.org/jira/browse/FLINK-28569
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.5, 1.15.2
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> Currently SinkUpsertMaterializer only update row by comparing the complete 
> row in anycase, but this may cause wrong result if input has upsertKey and 
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from 
> users");
> }
> {code}
> for original cdc source records:
> {code}
> +I[user1, Tom, t...@gmail.com, 10.02],
> -D[user1, Tom, t...@gmail.com, 10.02],
> {code}
> the above query cannot correctly delete the former insertion row because of 
> the non-deterministic column value 'ndFunc(user_name)'
> this canbe solved by letting the SinkUpsertMaterializer be aware of input 
> upsertKey and update by it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28569) SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty otherwise wrong result maybe produced

2022-10-11 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615681#comment-17615681
 ] 

Terry Wang commented on FLINK-28569:


+1 to backport this fix into 1.14 and 1.15

> SinkUpsertMaterializer should be aware of the input upsertKey if it is not 
> empty otherwise wrong result maybe produced
> --
>
> Key: FLINK-28569
> URL: https://issues.apache.org/jira/browse/FLINK-28569
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.14.5, 1.15.2
>Reporter: lincoln lee
>Assignee: lincoln lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.16.0, 1.17.0
>
>
> Currently SinkUpsertMaterializer only update row by comparing the complete 
> row in anycase, but this may cause wrong result if input has upsertKey and 
> also non-deterministic column values, see such a case:
> {code:java}
> @Test
> public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
> tEnv.createTemporaryFunction(
> "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
> String cdcDdl =
> "CREATE TABLE users (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " primary key (user_id) not enforced\n"
> + ") WITH (\n"
> + " 'connector' = 'values',\n"
> + " 'changelog-mode' = 'I,UA,UB,D'\n"
> + ")";
> String sinkTableDdl =
> "CREATE TABLE sink (\n"
> + " user_id STRING,\n"
> + " user_name STRING,\n"
> + " email STRING,\n"
> + " balance DECIMAL(18,2),\n"
> + " PRIMARY KEY(email) NOT ENFORCED\n"
> + ") WITH(\n"
> + " 'connector' = 'values',\n"
> + " 'sink-insert-only' = 'false'\n"
> + ")";
> tEnv.executeSql(cdcDdl);
> tEnv.executeSql(sinkTableDdl);
> util.verifyJsonPlan(
> "insert into sink select user_id, ndFunc(user_name), email, balance from 
> users");
> }
> {code}
> for original cdc source records:
> {code}
> +I[user1, Tom, t...@gmail.com, 10.02],
> -D[user1, Tom, t...@gmail.com, 10.02],
> {code}
> the above query cannot correctly delete the former insertion row because of 
> the non-deterministic column value 'ndFunc(user_name)'
> this canbe solved by letting the SinkUpsertMaterializer be aware of input 
> upsertKey and update by it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26334) When timestamp - offset + windowSize < 0, elements cannot be assigned to the correct window

2022-03-04 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501258#comment-17501258
 ] 

Terry Wang commented on FLINK-26334:


Good insight about the description of problem and cause analysis.(y)

> When timestamp - offset + windowSize < 0, elements cannot be assigned to the 
> correct window
> ---
>
> Key: FLINK-26334
> URL: https://issues.apache.org/jira/browse/FLINK-26334
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream
>Affects Versions: 1.15.0, 1.14.3
> Environment: flink version 1.14.3
>Reporter: realdengziqi
>Assignee: realdengziqi
>Priority: Critical
> Attachments: image-2022-03-04-11-28-26-616.png, 
> image-2022-03-04-11-37-10-035.png
>
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> h2. issue
>         Hello!
>         When we were studying the flink source code, we found that there was 
> a problem with its algorithm for calculating the window start time. When 
> _timestamp - offset + windowSize < 0_ , the element will be incorrectly 
> allocated to a window with a WindowSize larger than its own timestamp.
>         The problem is in 
> _org.apache.flink.streaming.api.windowing.windows.TimeWindow_
> {code:java}
> public static long getWindowStartWithOffset(long timestamp, long offset, long 
> windowSize) {
>     return timestamp - (timestamp - offset + windowSize) % windowSize;
> } {code}
> _!image-2022-03-04-11-28-26-616.png|width=710,height=251!_
>         We believe that this violates the constraints between time and 
> window. *That is, an element should fall within a window whose start time is 
> less than its own timestamp and whose end time is greater than its own 
> timestamp.* However, the current situation is when {_}timestamp - offset + 
> windowSize < 0{_}, *the element falls into a future time window.*
>        *You can reproduce the bug with the code at the end of the post.*
> h2. Solution       
>         In fact, the original algorithm is no problem in python, the key to 
> this problem is the processing of the remainder operation by the programming 
> language.
>         We finally think that it should be modified to the following 
> algorithm.
> {code:java}
> public static long getWindowStartWithOffset(long timestamp, long offset, long 
> windowSize) {
>     return timestamp
>             - (timestamp - offset) % windowSize
>             - (windowSize & (timestamp - offset) >> 63);
> } {code}
>         _windowSize & (timestamp - offset) >> 63_ The function of this 
> formula is to subtract windowSize from the overall operation result when 
> {_}timestamp - offset<0{_}, otherwise do nothing. This way we can handle both 
> positive and negative timestamps.
>         Finally, the element can be assigned to the correct window.
> !image-2022-03-04-11-37-10-035.png|width=712,height=284!
>         This code can pass current unit tests.
> h2. getWindowStartWithOffset methods in other packages
>         I think that there should be many places in 
> {_}getWindowStartWithOffset{_}. We searched for this method in the project 
> and found that the problem of negative timestamps is handled in _flink.table._
>         Below is their source code.
>         
> _{{org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping}}_
> {code:java}
> private long getWindowStartWithOffset(long timestamp, long offset, long 
> windowSize) {
>     long remainder = (timestamp - offset) % windowSize;
>     // handle both positive and negative cases
>     if (remainder < 0) {
>         return timestamp - (remainder + windowSize);
>     } else {
>         return timestamp - remainder;
>     }
> } {code}
> h2. Can we make a pull request?
>         If the community deems it necessary to revise it, hopefully this task 
> can be handed over to us. Our members are all students who have just 
> graduated from school, and it is a great encouragement for us to contribute 
> code to flink.
>         Thank you so much!
>         From Deng Ziqi & Lin Wanni & Guo Yuanfang
>  
> 
> 
> h2. reproduce
> {code:java}
> /* output
> WindowStart: -15000ExactSize:1(a,-17000)
> WindowStart: -1ExactSize:1(b,-12000)
> WindowStart: -5000 ExactSize:2(c,-7000)
> WindowStart: -5000 ExactSize:2(d,-2000)
> WindowStart: 0 ExactSize:1(e,3000)
> WindowStart: 5000  ExactSize:1(f,8000)
> WindowStart: 1 ExactSize:1(g,13000)
> WindowStart: 15000 ExactSize:1(h,18000)
>  */
> public class Example {
> public static void main(String[] args) throws Exception {
> final TimeZone timeZone = TimeZone.getTimeZone("GTM+0");
> TimeZone.setDefault(timeZone);
> StreamExecutionEnvironment env = 
> StreamExecutionEnviro

[jira] [Updated] (FLINK-23289) BinarySection should add null check in constructor method

2021-07-06 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-23289:
---
Summary: BinarySection should add null check in constructor method  (was: 
BinarySection should null check in constructor method)

> BinarySection should add null check in constructor method
> -
>
> Key: FLINK-23289
> URL: https://issues.apache.org/jira/browse/FLINK-23289
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Terry Wang
>Priority: Major
>
> {code:java}
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411)
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132)
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118)
>     at 
> org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360)
>     at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59)
>     at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23289) BinarySection should null check in constructor method

2021-07-06 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-23289:
---
Component/s: Table SQL / Runtime

> BinarySection should null check in constructor method
> -
>
> Key: FLINK-23289
> URL: https://issues.apache.org/jira/browse/FLINK-23289
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Runtime
>Reporter: Terry Wang
>Priority: Major
>
> {code:java}
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411)
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132)
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118)
>     at 
> org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360)
>     at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59)
>     at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23289) BinarySection should null check in contusctor method

2021-07-06 Thread Terry Wang (Jira)
Terry Wang created FLINK-23289:
--

 Summary: BinarySection should null check in contusctor method
 Key: FLINK-23289
 URL: https://issues.apache.org/jira/browse/FLINK-23289
 Project: Flink
  Issue Type: Improvement
Reporter: Terry Wang



{code:java}
Caused by: java.lang.NullPointerException
    at 
org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411)
    at 
org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132)
    at 
org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118)
    at 
org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360)
    at 
org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59)
    at 
org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37)
    at 
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128)
    at 
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86)
    at 
org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
    at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
    at 
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152)
    at 
org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-23289) BinarySection should null check in constructor method

2021-07-06 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-23289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-23289:
---
Summary: BinarySection should null check in constructor method  (was: 
BinarySection should null check in contusctor method)

> BinarySection should null check in constructor method
> -
>
> Key: FLINK-23289
> URL: https://issues.apache.org/jira/browse/FLINK-23289
> Project: Flink
>  Issue Type: Improvement
>Reporter: Terry Wang
>Priority: Major
>
> {code:java}
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411)
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132)
>     at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118)
>     at 
> org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360)
>     at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59)
>     at 
> org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86)
>     at 
> org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>     at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152)
>     at 
> org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-18516) Improve error message for rank function in streaming mode

2021-06-03 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang closed FLINK-18516.
--
Resolution: Works for Me

> Improve error message for rank function in streaming mode
> -
>
> Key: FLINK-18516
> URL: https://issues.apache.org/jira/browse/FLINK-18516
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Rui Li
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The following query currently fails with NPE:
> {code}
> create table foo (x int,y string,p as proctime()) with ...;
> select x,y,row_number() over (partition by x order by p) from foo;
> {code}
> which can be difficult for users to figure out the reason of the failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-18516) Improve error message for rank function in streaming mode

2021-06-03 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357087#comment-17357087
 ] 

Terry Wang edited comment on FLINK-18516 at 6/4/21, 6:13 AM:
-

hi [~lirui]
What's your version of flink reporting this npe?
I can not reproduce your error in flink master code. Maybe the bug has been 
fixed.


was (Author: terry1897):
hi [~lirui]
What's your flink version reporting this npe?
I can not reproduce your error in flink master code. Maybe the bug has been 
fixed.

> Improve error message for rank function in streaming mode
> -
>
> Key: FLINK-18516
> URL: https://issues.apache.org/jira/browse/FLINK-18516
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Rui Li
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The following query currently fails with NPE:
> {code}
> create table foo (x int,y string,p as proctime()) with ...;
> select x,y,row_number() over (partition by x order by p) from foo;
> {code}
> which can be difficult for users to figure out the reason of the failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18516) Improve error message for rank function in streaming mode

2021-06-03 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357087#comment-17357087
 ] 

Terry Wang commented on FLINK-18516:


hi [~lirui]
What's your flink version reporting this npe?
I can not reproduce your error in flink master code. Maybe the bug has been 
fixed.

> Improve error message for rank function in streaming mode
> -
>
> Key: FLINK-18516
> URL: https://issues.apache.org/jira/browse/FLINK-18516
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Reporter: Rui Li
>Priority: Minor
>  Labels: auto-deprioritized-major
>
> The following query currently fails with NPE:
> {code}
> create table foo (x int,y string,p as proctime()) with ...;
> select x,y,row_number() over (partition by x order by p) from foo;
> {code}
> which can be difficult for users to figure out the reason of the failure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks

2021-05-28 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang closed FLINK-15775.
--
Fix Version/s: 1.11.0
 Release Note: Has fixed after flink 1.11
   Resolution: Invalid

> SourceFunctions are instantiated twice when pulled on from 2 Sinks
> --
>
> Key: FLINK-15775
> URL: https://issues.apache.org/jira/browse/FLINK-15775
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benoît Paris
>Priority: Minor
>  Labels: auto-deprioritized-major
> Fix For: 1.11.0
>
> Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get 
> instantiated twice; (and subsequently opened by the parallelism number, which 
> is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK 
> behavior, but not the processing we want):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>  
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as 
> we're missing half the inputs in each SysoSink):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1"); 
> {code}
>  
> This might not be a problem for Kafka's SourceFunctions, as we can always 
> reread from a log; but it is a data loss problem when the source data can't 
> be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make 
> the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, 
> pom.xml, and logical execution plans for the duplicated case, and the 
> workaround.
>  
> 
> Workaround: make a conversion to an appendStream. Somehow this makes the 
> planner think it has to put a materialization barrier after the Source and 
> read from that:
>  
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>  tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new 
> TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>  
>  
> Best Regards,
> Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks

2021-05-25 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351040#comment-17351040
 ] 

Terry Wang edited comment on FLINK-15775 at 5/25/21, 12:50 PM:
---

Hi, [~BenoitParis]~
FYI, TableEnvironment has offered a way to execute multiple insert into one 
pipeline since 1.11, you may refer to the doc 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/insert/


was (Author: terry1897):
Hi, [~BenoitParis]~

> SourceFunctions are instantiated twice when pulled on from 2 Sinks
> --
>
> Key: FLINK-15775
> URL: https://issues.apache.org/jira/browse/FLINK-15775
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benoît Paris
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get 
> instantiated twice; (and subsequently opened by the parallelism number, which 
> is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK 
> behavior, but not the processing we want):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>  
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as 
> we're missing half the inputs in each SysoSink):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1"); 
> {code}
>  
> This might not be a problem for Kafka's SourceFunctions, as we can always 
> reread from a log; but it is a data loss problem when the source data can't 
> be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make 
> the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, 
> pom.xml, and logical execution plans for the duplicated case, and the 
> workaround.
>  
> 
> Workaround: make a conversion to an appendStream. Somehow this makes the 
> planner think it has to put a materialization barrier after the Source and 
> read from that:
>  
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>  tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new 
> TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>  
>  
> Best Regards,
> Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks

2021-05-25 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351040#comment-17351040
 ] 

Terry Wang commented on FLINK-15775:


Hi, [~BenoitParis]~

> SourceFunctions are instantiated twice when pulled on from 2 Sinks
> --
>
> Key: FLINK-15775
> URL: https://issues.apache.org/jira/browse/FLINK-15775
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.9.1, 1.10.0
>Reporter: Benoît Paris
>Priority: Minor
>  Labels: auto-deprioritized-major
> Attachments: flink-test-duplicated-sources.zip
>
>
> When pulled on by two sinks, the SourceFunctions of a TableSource will get 
> instantiated twice; (and subsequently opened by the parallelism number, which 
> is expected behavior):
> The following will instantiate the FooTableSource's SourceFunction once (OK 
> behavior, but not the processing we want):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> out0.insertInto("syso_sink_0");
> {code}
>  
> This will instantiate the FooTableSource's SourceFunction twice (Not OK, as 
> we're missing half the inputs in each SysoSink):
>  
> {code:java}
> tEnv.registerTableSource("foo_table", new FooTableSource());
> Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0");
> Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1");
> tEnv.registerTableSink("syso_sink_0", new SysoSink());
> tEnv.registerTableSink("syso_sink_1", new SysoSink());
> out0.insertInto("syso_sink_0");
> out1.insertInto("syso_sink_1"); 
> {code}
>  
> This might not be a problem for Kafka's SourceFunctions, as we can always 
> reread from a log; but it is a data loss problem when the source data can't 
> be reproduced.
> Actually, this might be me not understanding the API. Is there a way to make 
> the runtime read from the same opened SourceFunctions?
> Attached is Java code that logs the faulty opening of the SourceFunctions, 
> pom.xml, and logical execution plans for the duplicated case, and the 
> workaround.
>  
> 
> Workaround: make a conversion to an appendStream. Somehow this makes the 
> planner think it has to put a materialization barrier after the Source and 
> read from that:
>  
> {code:java}
> tEnv.registerTableSource("foo_table_source", new FooTableSource());
> Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source");
> Table appendingSourceTable = tEnv.fromDataStream(
>  tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new 
> TypeInformation[]{Types.LONG()}))
> );
> tEnv.registerTable("foo_table", appendingSourceTable);{code}
>  
>  
> Best Regards,
> Ben



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22654) SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks

2021-05-13 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344282#comment-17344282
 ] 

Terry Wang commented on FLINK-22654:


I reproduce this problem in my local environment, and it's a bug of 
`SqlCreateTable#unparse()`.
I will open a pr to fix it soon.

> SqlCreateTable  toString()/unparse() lose CONSTRAINTS  and watermarks
> -
>
> Key: FLINK-22654
> URL: https://issues.apache.org/jira/browse/FLINK-22654
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.13.0
>Reporter: youxianq
>Priority: Minor
>
> create a SqlCreateTable using like clause   and then toString() or unparse()  
> will lose watermark 
> if  no column. 
> {code:java}
> public static SqlParser getSqlParser(String sql) {
> SourceStringReader sqlReader = new SourceStringReader(sql);
> return SqlParser.create(sqlReader,
> SqlParser.configBuilder()
> .setParserFactory(FlinkSqlParserImpl.FACTORY)
> .setLex(Lex.JAVA)
> .setIdentifierMaxLength(256)
> .setConformance(FlinkSqlConformance.DEFAULT)
> .build());
> }
> public static void main(String[] args) throws Exception {
> SqlParser sqlParser = getSqlParser("" +
> "create TEMPORARY table t_order_course (\n" +
> "   WATERMARK FOR last_update_time AS last_update_time - INTERVAL 
> '5' SECOND\n" +
> ") with (\n" +
> "  'scan.startup.mode' = 'specific-offsets',\n" +
> "  'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129'\n" +
> ") like cdc.`qq_data(sh-backend-tst:3306)`.t_order_course (\n" +
> "   OVERWRITING  WATERMARKS\n" +
> "   OVERWRITING OPTIONS\n" +
> "   EXCLUDING CONSTRAINTS\n" +
> " \n" +
> ")");
> SqlNode sqlNode = sqlParser.parseStmt();
> System.out.println(sqlNode.toString());
> }
> {code}
> output:
> CREATE TEMPORARY TABLE `t_order_course` WITH ( 'scan.startup.mode' = 
> 'specific-offsets', 'scan.startup.specific-offsets' = 
> 'partition:0,offset:1169129' ) LIKE 
> `cdc`.`qq_data(sh-backend-tst:3306)`.`t_order_course` ( OVERWRITING 
> WATERMARKS OVERWRITING OPTIONS EXCLUDING CONSTRAINTS )



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21972) Check whether TemporalTableSourceSpec can be serialized or not

2021-03-25 Thread Terry Wang (Jira)
Terry Wang created FLINK-21972:
--

 Summary: Check whether TemporalTableSourceSpec  can be serialized 
or not 
 Key: FLINK-21972
 URL: https://issues.apache.org/jira/browse/FLINK-21972
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang
 Fix For: 1.13.0


Check whether TemporalTableSourceSpec  can be serialized or not 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21868) Support StreamExecLookupJoin json serialization/deserialization

2021-03-19 Thread Terry Wang (Jira)
Terry Wang created FLINK-21868:
--

 Summary: Support StreamExecLookupJoin json 
serialization/deserialization
 Key: FLINK-21868
 URL: https://issues.apache.org/jira/browse/FLINK-21868
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang
 Fix For: 1.13.0


Support StreamExecLookupJoin json serialization/deserialization



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21864) Support StreamExecTemporalJoin json serialization/deserialization

2021-03-18 Thread Terry Wang (Jira)
Terry Wang created FLINK-21864:
--

 Summary: Support StreamExecTemporalJoin json 
serialization/deserialization
 Key: FLINK-21864
 URL: https://issues.apache.org/jira/browse/FLINK-21864
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang
 Fix For: 1.13.0


Support StreamExecTemporalJoin json serialization/deserialization



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21837) Support StreamExecIntervalJoin json ser/de

2021-03-18 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-21837:
---
Description: Support StreamExecIntervalJoin json ser/des  (was: Support 
StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des)

> Support StreamExecIntervalJoin json ser/de
> --
>
> Key: FLINK-21837
> URL: https://issues.apache.org/jira/browse/FLINK-21837
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Assignee: Terry Wang
>Priority: Major
> Fix For: 1.13.0
>
>
> Support StreamExecIntervalJoin json ser/des



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21837) Support StreamExecIntervalJoin json ser/de

2021-03-18 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-21837:
---
Summary: Support StreamExecIntervalJoin json ser/de  (was: Support 
StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/de)

> Support StreamExecIntervalJoin json ser/de
> --
>
> Key: FLINK-21837
> URL: https://issues.apache.org/jira/browse/FLINK-21837
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Assignee: Terry Wang
>Priority: Major
> Fix For: 1.13.0
>
>
> Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin 
> json ser/des



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21837) Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/de

2021-03-16 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-21837:
---
Summary: Support 
StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/de  
(was: Support 
StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des)

> Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin 
> json ser/de
> --
>
> Key: FLINK-21837
> URL: https://issues.apache.org/jira/browse/FLINK-21837
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Fix For: 1.13.0
>
>
> Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin 
> json ser/des



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21837) Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des

2021-03-16 Thread Terry Wang (Jira)
Terry Wang created FLINK-21837:
--

 Summary: Support 
StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des
 Key: FLINK-21837
 URL: https://issues.apache.org/jira/browse/FLINK-21837
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang
 Fix For: 1.13.0


Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json 
ser/des



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21811) Support StreamExecJoin json serialization/deserialization

2021-03-16 Thread Terry Wang (Jira)
Terry Wang created FLINK-21811:
--

 Summary: Support StreamExecJoin json serialization/deserialization
 Key: FLINK-21811
 URL: https://issues.apache.org/jira/browse/FLINK-21811
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang
 Fix For: 1.13.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-21802) LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType

2021-03-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302255#comment-17302255
 ] 

Terry Wang edited comment on FLINK-21802 at 3/16/21, 6:40 AM:
--

It seems I mistakenly put this issure under FLINK-20435, anyone can help me 
move this issure to subtask of FLINK-21091 ?


was (Author: terry1897):
It seems I mistakenly put this issure under FLINK-20435, anyone can help me 
move this issure to subtask of 
https://issues.apache.org/jira/browse/FLINK-21091. 

> LogicalTypeJsonDeserializer/Serializer custom 
> RowType/MapType/ArrayType/MultisetType
> 
>
> Key: FLINK-21802
> URL: https://issues.apache.org/jira/browse/FLINK-21802
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> We should custom  RowType/MapType/ArrayType/MultiSetType 
> deserialize/serializer method to allow some special LogicalType such as 
> TimestampType's kind field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21802) LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType

2021-03-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302255#comment-17302255
 ] 

Terry Wang commented on FLINK-21802:


It seems I mistakenly put this issure under FLINK-20435, anyone can help me 
move this issure to subtask of 
https://issues.apache.org/jira/browse/FLINK-21091. 

> LogicalTypeJsonDeserializer/Serializer custom 
> RowType/MapType/ArrayType/MultisetType
> 
>
> Key: FLINK-21802
> URL: https://issues.apache.org/jira/browse/FLINK-21802
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.13.0
>
>
> We should custom  RowType/MapType/ArrayType/MultiSetType 
> deserialize/serializer method to allow some special LogicalType such as 
> TimestampType's kind field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21802) LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType

2021-03-15 Thread Terry Wang (Jira)
Terry Wang created FLINK-21802:
--

 Summary: LogicalTypeJsonDeserializer/Serializer custom 
RowType/MapType/ArrayType/MultisetType
 Key: FLINK-21802
 URL: https://issues.apache.org/jira/browse/FLINK-21802
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang
 Fix For: 1.13.0


We should custom  RowType/MapType/ArrayType/MultiSetType deserialize/serializer 
method to allow some special LogicalType such as TimestampType's kind field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-21744) Support StreamExecDeduplicate json serialization/deserialization

2021-03-12 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-21744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300167#comment-17300167
 ] 

Terry Wang commented on FLINK-21744:


hi [~godfreyhe]~ I'd like to complete this exciting task. Feel free to assign 
this to me 

> Support StreamExecDeduplicate json serialization/deserialization
> 
>
> Key: FLINK-21744
> URL: https://issues.apache.org/jira/browse/FLINK-21744
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-21744) Support StreamExecDeduplicate json serialization/deserialization

2021-03-12 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-21744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-21744:
---
Fix Version/s: 1.13.0

> Support StreamExecDeduplicate json serialization/deserialization
> 
>
> Key: FLINK-21744
> URL: https://issues.apache.org/jira/browse/FLINK-21744
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Fix For: 1.13.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21744) Support StreamExecDeduplicate json serialization/deserialization

2021-03-12 Thread Terry Wang (Jira)
Terry Wang created FLINK-21744:
--

 Summary: Support StreamExecDeduplicate json 
serialization/deserialization
 Key: FLINK-21744
 URL: https://issues.apache.org/jira/browse/FLINK-21744
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Reporter: Terry Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15848) Support both fixed allocator and dynamic allocator in flink

2020-05-25 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115867#comment-17115867
 ] 

Terry Wang commented on FLINK-15848:


+1 for this feature. It's very useful in benchmark and some other specific use 
case.

> Support both fixed allocator and dynamic allocator in flink
> ---
>
> Key: FLINK-15848
> URL: https://issues.apache.org/jira/browse/FLINK-15848
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Affects Versions: 1.10.0
>Reporter: liupengcheng
>Priority: Major
>
> Currently, we removed static allocator and only support dynamic allocation in 
> flink1.10, however, this allocator still has some drawbacks:
>  # Can not allocate resources in a range, which means the resource usage is 
> not under control, this has very bad effect in a shared resource cluster(e.g. 
> Yarn), one big query or job may occupy all the resources and  cause other 
> jobs block.
>  # Not support static resource allocation. That means we can hardly do 
> benchmark testing across engines(e.g. Spark). Also, in resource shared 
> cluster(e.g. Yarn) that support over-allocation, it's hard to align the 
> resources usage.
> As discussed in FLINK-12362 ,  we should support both fixed allocator and 
> dynamic allocator(dynamically allocating in a range) in flink.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102290#comment-17102290
 ] 

Terry Wang commented on FLINK-17553:


I open a PR https://github.com/apache/flink/pull/12028 to help understanding 
and solving this bug.

> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
>   """
> |CREATE TABLE src (
> |  log_ts STRING,
> |  ts TIMESTAMP(3),
> |  a INT,
> |  b DOUBLE,
> |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> |) WITH (
> |  'connector' = 'COLLECTION',
> |  'is-bounded' = 'false'
> |)
>   """.stripMargin
> val ddl2 =
>   """
> |CREATE TABLE dst (
> |  ts TIMESTAMP(3),
> |  a BIGINT,
> |  b DOUBLE
> |) WITH (
> |  'connector.type' = 'filesystem',
> |  'connector.path' = '/tmp/1',
> |  'format.type' = 'csv'
> |)
>   """.stripMargin
> val query =
>   """
> |INSERT INTO dst
> |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), 
> SUM(b)
> |FROM src
> | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
> |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
>   """.stripMargin
> tEnv.sqlUpdate(ddl1)
> tEnv.sqlUpdate(ddl2)
> tEnv.sqlUpdate(query)
> println(tEnv.explain(true))
>   }
> {code}
> I spent lots of work digging into this bug, and found the problem may be 
> caused by AggregateProjectPullUpConstantsRule which doesn't generate proper 
> project items correctly.
> After I remove AggregateProjectPullUpConstantsRule from 
> FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.
> The problem is that WindowPropertiesRule can not match RelNodeTree after the 
> transformation of AggregateProjectPullUpConstantsRule, we also can add 
> ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
> AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Fix Version/s: 1.11.0

> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Fix For: 1.11.0
>
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
>   """
> |CREATE TABLE src (
> |  log_ts STRING,
> |  ts TIMESTAMP(3),
> |  a INT,
> |  b DOUBLE,
> |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> |) WITH (
> |  'connector' = 'COLLECTION',
> |  'is-bounded' = 'false'
> |)
>   """.stripMargin
> val ddl2 =
>   """
> |CREATE TABLE dst (
> |  ts TIMESTAMP(3),
> |  a BIGINT,
> |  b DOUBLE
> |) WITH (
> |  'connector.type' = 'filesystem',
> |  'connector.path' = '/tmp/1',
> |  'format.type' = 'csv'
> |)
>   """.stripMargin
> val query =
>   """
> |INSERT INTO dst
> |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), 
> SUM(b)
> |FROM src
> | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
> |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
>   """.stripMargin
> tEnv.sqlUpdate(ddl1)
> tEnv.sqlUpdate(ddl2)
> tEnv.sqlUpdate(query)
> println(tEnv.explain(true))
>   }
> {code}
> I spent lots of work digging into this bug, and found the problem may be 
> caused by AggregateProjectPullUpConstantsRule which doesn't generate proper 
> project items correctly.
> After I remove AggregateProjectPullUpConstantsRule from 
> FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.
> The problem is that WindowPropertiesRule can not match RelNodeTree after the 
> transformation of AggregateProjectPullUpConstantsRule, we also can add 
> ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
> AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Description: 
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}







I spent lots of work digging into this bug, and found the problem may be caused 
by AggregateProjectPullUpConstantsRule which doesn't generate proper project 
items correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.

The problem is WindowPropertiesRule can not match RelNodeTree after the 
transformation of AggregateProjectPullUpConstantsRule, we also can add 
ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.













  was:
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}







I spent lots of work digging the bug, and found the problem may be caused by 
AggregateProjectPullUpConstantsRule which doesn't generate proper project items 
correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.














> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
>   """
> |CREATE TABLE src (
> |  log_ts STRING,
> |  ts TIMESTAMP(3),
> |  a INT,
> |  b DOUBLE,
> |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> 

[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Description: 
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}







I spent lots of work digging into this bug, and found the problem may be caused 
by AggregateProjectPullUpConstantsRule which doesn't generate proper project 
items correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.

The problem is that WindowPropertiesRule can not match RelNodeTree after the 
transformation of AggregateProjectPullUpConstantsRule, we also can add 
ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.













  was:
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}







I spent lots of work digging into this bug, and found the problem may be caused 
by AggregateProjectPullUpConstantsRule which doesn't generate proper project 
items correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.

The problem is WindowPropertiesRule can not match RelNodeTree after the 
transformation of AggregateProjectPullUpConstantsRule, we also can add 
ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after 
AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem.














> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Un

[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Description: 
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}







I spent lots of work digging the bug, and found the problem may be caused by 
AggregateProjectPullUpConstantsRule which doesn't generate proper project items 
correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.













  was:
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}

I spent lots of work digging the bug, and found the problem may be caused by 
AggregateProjectPullUpConstantsRule which doesn't generate proper project items 
correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.





> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
>   """
> |CREATE TABLE src (
> |  log_ts STRING,
> |  ts TIMESTAMP(3),
> |  a INT,
> |  b DOUBLE,
> |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> |) WITH (
> |  'connector' = 'COLLECTION',
> |  'is-bounded' = 'false'
> |)
>   """.stripMargin
> val ddl2 =
>   """
> |CREATE TABLE dst (
> |  ts TIMESTAMP(3),
> |  a BIGINT,
> |  b DOUBLE
> |) WITH (
> |  'connector.type' = 'fi

[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Description: 
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}

I spent lots of work digging the bug, and found the problem may be caused by 
AggregateProjectPullUpConstantsRule which doesn't generate proper project items 
correctly.
After I remove AggregateProjectPullUpConstantsRule from 
FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect.




  was:
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}





> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
>   """
> |CREATE TABLE src (
> |  log_ts STRING,
> |  ts TIMESTAMP(3),
> |  a INT,
> |  b DOUBLE,
> |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> |) WITH (
> |  'connector' = 'COLLECTION',
> |  'is-bounded' = 'false'
> |)
>   """.stripMargin
> val ddl2 =
>   """
> |CREATE TABLE dst (
> |  ts TIMESTAMP(3),
> |  a BIGINT,
> |  b DOUBLE
> |) WITH (
> |  'connector.type' = 'filesystem',
> |  'connector.path' = '/tmp/1',
> |  'format.type' = 'csv'
> |)
>   """.stripMargin
> val query =
>   """
> |INSERT INTO dst
> |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), 
> SUM(b)
> |FROM src
> | GROUP BY 'a', 

[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Description: 
Exception stack is as following:
 !temp.png! 

We can reproduce this problem by add following test in 
org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase

{code:scala}
// Some comments here
  @Test
  def testWindowAggregateOnConstantValue(): Unit = {
val ddl1 =
  """
|CREATE TABLE src (
|  log_ts STRING,
|  ts TIMESTAMP(3),
|  a INT,
|  b DOUBLE,
|  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
|  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
|) WITH (
|  'connector' = 'COLLECTION',
|  'is-bounded' = 'false'
|)
  """.stripMargin
val ddl2 =
  """
|CREATE TABLE dst (
|  ts TIMESTAMP(3),
|  a BIGINT,
|  b DOUBLE
|) WITH (
|  'connector.type' = 'filesystem',
|  'connector.path' = '/tmp/1',
|  'format.type' = 'csv'
|)
  """.stripMargin
val query =
  """
|INSERT INTO dst
|SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b)
|FROM src
| GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
|-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
  """.stripMargin
tEnv.sqlUpdate(ddl1)
tEnv.sqlUpdate(ddl2)
tEnv.sqlUpdate(query)
println(tEnv.explain(true))
  }

{code}




  was:
Exception stack is as following:




> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:
>  !temp.png! 
> We can reproduce this problem by add following test in 
> org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase
> {code:scala}
> // Some comments here
>   @Test
>   def testWindowAggregateOnConstantValue(): Unit = {
> val ddl1 =
>   """
> |CREATE TABLE src (
> |  log_ts STRING,
> |  ts TIMESTAMP(3),
> |  a INT,
> |  b DOUBLE,
> |  rowtime AS CAST(log_ts AS TIMESTAMP(3)),
> |  WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND
> |) WITH (
> |  'connector' = 'COLLECTION',
> |  'is-bounded' = 'false'
> |)
>   """.stripMargin
> val ddl2 =
>   """
> |CREATE TABLE dst (
> |  ts TIMESTAMP(3),
> |  a BIGINT,
> |  b DOUBLE
> |) WITH (
> |  'connector.type' = 'filesystem',
> |  'connector.path' = '/tmp/1',
> |  'format.type' = 'csv'
> |)
>   """.stripMargin
> val query =
>   """
> |INSERT INTO dst
> |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), 
> SUM(b)
> |FROM src
> | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)
> |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND)
>   """.stripMargin
> tEnv.sqlUpdate(ddl1)
> tEnv.sqlUpdate(ddl2)
> tEnv.sqlUpdate(query)
> println(tEnv.explain(true))
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Description: 
Exception stack is as following:



> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Attachment: temp.png

> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
> Attachments: temp.png
>
>
> Exception stack is as following:



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17553:
---
Summary: Constant exists in group window key leads to  error:  Unsupported 
call: TUMBLE_END(TIMESTAMP(3) NOT NULL)  (was: Group by constant and window 
causes error:  Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL))

> Constant exists in group window key leads to  error:  Unsupported call: 
> TUMBLE_END(TIMESTAMP(3) NOT NULL)
> -
>
> Key: FLINK-17553
> URL: https://issues.apache.org/jira/browse/FLINK-17553
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17553) Group by constant and window causes error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)

2020-05-07 Thread Terry Wang (Jira)
Terry Wang created FLINK-17553:
--

 Summary: Group by constant and window causes error:  Unsupported 
call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
 Key: FLINK-17553
 URL: https://issues.apache.org/jira/browse/FLINK-17553
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Terry Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17313) Validation error when insert decimal/varchar with precision into sink using TypeInformation of row

2020-04-24 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17313:
---
Description: 
Test code like follwing(in blink planner):
{code:java}
tEnv.sqlUpdate("create table randomSource (" +
"   a varchar(10)," 
+
"   b 
decimal(20,2)" +
"   ) with (" +
"   'type' = 
'random'," +
"   'count' = '10'" 
+
"   )");
tEnv.sqlUpdate("create table printSink (" +
"   a varchar(10)," 
+
"   b 
decimal(22,2)," +
"   ) with (" +
"   'type' = 'print'" +
"   )");
tEnv.sqlUpdate("insert into printSink select * from 
randomSource");
tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation getRecordType() {
return getTableSchema().toRowType();
}
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 
'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
doesn't consider the different physical and logical type validation logic of 
source and sink:   logical type should be able to cover the physical type in 
source, but physical type should be able to cover the logic type in sink vice 
verse. Besides, the decimal ty

[jira] [Updated] (FLINK-17313) Validation error when insert decimal/varchar with precision into sink using TypeInformation of row

2020-04-24 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17313:
---
Summary: Validation error when insert decimal/varchar with precision into 
sink using TypeInformation of row  (was: Validation error when insert 
decimal/timestamp/varchar with precision into sink using TypeInformation of row)

> Validation error when insert decimal/varchar with precision into sink using 
> TypeInformation of row
> --
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.Abstrac

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089530#comment-17089530
 ] 

Terry Wang commented on FLINK-17313:


supportsAvoidingCast method behavior looks right to me, relaxed check will 
improve user experience of old-style connector and don't affect the correctness.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.Abs

[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089371#comment-17089371
 ] 

Terry Wang edited comment on FLINK-17313 at 4/22/20, 7:35 AM:
--

We can not forbid using old-style TableSink in sql, and the old-style TableSink 
can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make 
sense to pass the check while logical type is varchar(10)/decimal(22,2)


was (Author: terry1897):
We can not forbid using old-style TableSink in sql, and the old-style TableSink 
can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make 
sense to pass the check logic that logical type is varchar(10)/decimal(22,2)

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collec

[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089371#comment-17089371
 ] 

Terry Wang edited comment on FLINK-17313 at 4/22/20, 7:24 AM:
--

We can not forbid using old-style TableSink in sql, and the old-style TableSink 
can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make 
sense to pass the check logic that logical type is varchar(10)/decimal(22,2)


was (Author: terry1897):
We can not forbid using old connector in ddl, and the old type style TableSink 
can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make 
sense to pass the check logic that logical type is varchar(10)/decimal(22,2)

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.co

[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089370#comment-17089370
 ] 

Terry Wang edited comment on FLINK-17313 at 4/22/20, 7:22 AM:
--

I think you may misunderstand my fix. Source and sink should be in two 
different validation logic. If the logical type defined in ddl can be consumed 
by physical type of table sink returned, why we must match those two schema 
exactly, and the check logic is a relaxed check :  
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232.
 I don't know if I express clearly, let me know if u have more questions . 
[~dwysakowicz]


was (Author: terry1897):
I think you may misunderstand my fix. Source and sink should be in two 
different validation logic. If the logical type defined in ddl can be consumed 
by physical type of table sink returned, wy we must match those two schema 
exactly, and the check logic is a relaxed check :  
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232.
 I don't know if I express clearly, let me know if u have more questions . 
[~dwysakowicz]

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plan

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089371#comment-17089371
 ] 

Terry Wang commented on FLINK-17313:


We can not forbid using old connector in ddl, and the old type style TableSink 
can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make 
sense to pass the check logic that logical type is varchar(10)/decimal(22,2)

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.Traversa

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089370#comment-17089370
 ] 

Terry Wang commented on FLINK-17313:


I think you may misunderstand my fix. Source and sink should be in two 
different validation logic. If the logical type defined in ddl can be consumed 
by physical type of table sink returned, wy we must match those two schema 
exactly, and the check logic is a relaxed check :  
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232.
 I don't know if I express clearly, let me know if u have more questions . 
[~dwysakowicz]

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Itera

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089362#comment-17089362
 ] 

Terry Wang commented on FLINK-17313:


Hi [~dwysakowicz] The types that originate form the old type system of 
varcahr(10)/decimal(22, 2)/timestamp(3) is String/legacy(Decimal)/timestamp(3) 
and should be able to accept corresponding logical type in ddl, which is also 
the [PR|https://github.com/apache/flink/pull/11848] aims to solve.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterabl

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-22 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089357#comment-17089357
 ] 

Terry Wang commented on FLINK-17313:


[~lzljs3620320] There isn't much need to introduce a new interface in TableSink 
to solve this ticket. Just as [~wenlong.lwl] said, it's a validation bug 
causing connectors using old TableSink can not work normmaly.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.ma

[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089326#comment-17089326
 ] 

Terry Wang edited comment on FLINK-17313 at 4/22/20, 6:19 AM:
--

[~jark]  I agree with you that new sink interface of FLIP-95 can work normally, 
but there still a lot of connector that use the old interface. It's harmless to 
support such compatibility and useful for users who can not migrate their 
connector in time, right?


was (Author: terry1897):
[~jark]  I agree with you that new sink interface of FLIP-95 can solve problem, 
but there still a lot of connector that use the old interface. It's harmless to 
support such compatibility and useful for users who can not migrate their 
connector in time, right?

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089326#comment-17089326
 ] 

Terry Wang commented on FLINK-17313:


[~jark]  I agree with you that new sink interface of FLIP-95 can solve problem, 
but there still a lot of connector that use the old interface. It's harmless to 
support such compatibility and useful for users who can not migrate their 
connector in time, right?

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089258#comment-17089258
 ] 

Terry Wang commented on FLINK-17313:


I open a 
[https://github.com/apache/flink/pull/11848|https://github.com/apache/flink/pull/11848]
 to help understanding and solve this validation exception.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.Ab

[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17313:
---
Description: 
Test code like follwing(in blink planner):
{code:java}
tEnv.sqlUpdate("create table randomSource (" +
"   a varchar(10)," 
+
"   b 
decimal(20,2)" +
"   ) with (" +
"   'type' = 
'random'," +
"   'count' = '10'" 
+
"   )");
tEnv.sqlUpdate("create table printSink (" +
"   a varchar(10)," 
+
"   b 
decimal(22,2)," +
"   c 
timestamp(3)," +
"   ) with (" +
"   'type' = 'print'" +
"   )");
tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation getRecordType() {
return getTableSchema().toRowType();
}
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 
'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
doesn't consider the different physical and logical type validation logic of 
source and sink:   logical type should be able to cover the physical type in 
source

[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17313:
---
Description: 
Test code like follwing(in blink planner):
{code:java}
tEnv.sqlUpdate("create table randomSource (" +
"   a varchar(10)," 
+
"   b 
decimal(20,2)" +
"   ) with (" +
"   'type' = 
'random'," +
"   'count' = '10'" 
+
"   )");
tEnv.sqlUpdate("create table printSink (" +
"   a varchar(10)," 
+
"   b 
decimal(22,2)," +
"   c 
timestamp(3)," +
"   ) with (" +
"   'type' = 'print'" +
"   )");
tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation getRecordType() {
return getTableSchema().toRowType();
}
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 
'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
doesn't consider the different physical and logical type validation logic of 
source and sink.






  was:
Test code like follwing(in blink planner):
{code:java}

[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089251#comment-17089251
 ] 

Terry Wang commented on FLINK-17313:


cc [~ykt836][~jark][~dwysakowicz]Please have a look on this issue.

> Validation error when insert decimal/timestamp/varchar with precision into 
> sink using TypeInformation of row
> 
>
> Key: FLINK-17313
> URL: https://issues.apache.org/jira/browse/FLINK-17313
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Terry Wang
>Priority: Major
>
> Test code like follwing(in blink planner):
> {code:java}
>   tEnv.sqlUpdate("create table randomSource (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(20,2)" +
>   "   ) with (" +
>   "   'type' = 
> 'random'," +
>   "   'count' = '10'" 
> +
>   "   )");
>   tEnv.sqlUpdate("create table printSink (" +
>   "   a varchar(10)," 
> +
>   "   b 
> decimal(22,2)," +
>   "   c 
> timestamp(3)," +
>   "   ) with (" +
>   "   'type' = 'print'" +
>   "   )");
>   tEnv.sqlUpdate("insert into printSink select *, 
> current_timestamp from randomSource");
>   tEnv.execute("");
> {code}
> Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
> following:
> {code:java}
> public TypeInformation getRecordType() {
>   return getTableSchema().toRowType();
>   }
> {code}
> Varchar column validation exception is:
> org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
> field 'a' does not match with the physical type STRING of the 'a' field of 
> the TableSink consumed type.
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
>   at 
> org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
>   at 
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
>   at 
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
>   at scala.Option.map(Option.scala:146)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.sc

[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-17313:
---
Description: 
Test code like follwing(in blink planner):
{code:java}
tEnv.sqlUpdate("create table randomSource (" +
"   a varchar(10)," 
+
"   b 
decimal(20,2)" +
"   ) with (" +
"   'type' = 
'random'," +
"   'count' = '10'" 
+
"   )");
tEnv.sqlUpdate("create table printSink (" +
"   a varchar(10)," 
+
"   b 
decimal(22,2)," +
"   c 
timestamp(3)," +
"   ) with (" +
"   'type' = 'print'" +
"   )");
tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation getRecordType() {
return getTableSchema().toRowType();
}
{code}


Varchar column validation exception is:

org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 
'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)

Other type validation exception is similar, I dig into and think it's caused by 
TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method 
don't consider the different affect of source and sink . I will open a PR soon 
to solve this problem.






  was:
Test code like follwing(in blink planner):
{co

[jira] [Created] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row

2020-04-21 Thread Terry Wang (Jira)
Terry Wang created FLINK-17313:
--

 Summary: Validation error when insert decimal/timestamp/varchar 
with precision into sink using TypeInformation of row
 Key: FLINK-17313
 URL: https://issues.apache.org/jira/browse/FLINK-17313
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Terry Wang


Test code like follwing(in blink planner):
{code:java}
tEnv.sqlUpdate("create table randomSource (" +
"   a varchar(10)," 
+
"   b 
decimal(20,2)" +
"   ) with (" +
"   'type' = 
'random'," +
"   'count' = '10'" 
+
"   )");
tEnv.sqlUpdate("create table printSink (" +
"   a varchar(10)," 
+
"   b 
decimal(22,2)," +
"   c 
timestamp(3)," +
"   ) with (" +
"   'type' = 'print'" +
"   )");
tEnv.sqlUpdate("insert into printSink select *, 
current_timestamp from randomSource");
tEnv.execute("");
{code}

Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as 
following:


{code:java}
public TypeInformation getRecordType() {
return getTableSchema().toRowType();
}
{code}


varchar type exception is:


||Heading 1||
|org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table 
field 'a' does not match with the physical type STRING of the 'a' field of the 
TableSink consumed type.

at 
org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278)
at 
org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255)
at 
org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67)
at 
org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255)
at 
org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191)
at scala.Option.map(Option.scala:146)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822)
|
other type validation is similar, I dig it and found it's caused by 
TypeMappingUtils#checkPh

[jira] [Commented] (FLINK-17263) Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with calcite's CompositeOperandTypeChecker

2020-04-20 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087585#comment-17087585
 ] 

Terry Wang commented on FLINK-17263:


I open a [PR|https://github.com/apache/flink/pull/11819], and feel free assign 
this issue to me  cc [~ykt836]

> Remove RepeatFamilyOperandTypeChecker in blink planner and replace it  with 
> calcite's CompositeOperandTypeChecker
> -
>
> Key: FLINK-17263
> URL: https://issues.apache.org/jira/browse/FLINK-17263
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Terry Wang
>Priority: Major
>  Labels: pull-request-available
>
> Remove RepeatFamilyOperandTypeChecker in blink planner and replace it  with 
> calcite's CompositeOperandTypeChecker.
> It seems that what CompositeOperandTypeChecker can do is a super set of 
> RepeatFamilyOperandTypeChecker. To keep code easy to read, it's better to do 
> such refactor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17263) Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with calcite's CompositeOperandTypeChecker

2020-04-20 Thread Terry Wang (Jira)
Terry Wang created FLINK-17263:
--

 Summary: Remove RepeatFamilyOperandTypeChecker in blink planner 
and replace it  with calcite's CompositeOperandTypeChecker
 Key: FLINK-17263
 URL: https://issues.apache.org/jira/browse/FLINK-17263
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: Terry Wang


Remove RepeatFamilyOperandTypeChecker in blink planner and replace it  with 
calcite's CompositeOperandTypeChecker.
It seems that what CompositeOperandTypeChecker can do is a super set of 
RepeatFamilyOperandTypeChecker. To keep code easy to read, it's better to do 
such refactor.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-17152) FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition

2020-04-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083878#comment-17083878
 ] 

Terry Wang edited comment on FLINK-17152 at 4/15/20, 7:48 AM:
--

cc [~liyu] I think this bug should be included in flink 1.10.1 release.


was (Author: terry1897):
cc [~liyu2] I think this bug should be included in flink 1.10.1 release.

> FunctionDefinitionUtil generate wrong resultType and  acc type of 
> AggregateFunctionDefinition
> -
>
> Key: FLINK-17152
> URL: https://issues.apache.org/jira/browse/FLINK-17152
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0
>Reporter: Terry Wang
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FunctionDefinitionUtil generate wrong resultType and  acc type of 
> AggregateFunctionDefinition. This bug will  lead to unexpect error such as: 
> Field types of query result and registered TableSink do not  match.
> Query schema: [v:RAW(IAccumulator, ?)]
> Sink schema: [v: STRING]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17152) FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition

2020-04-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083878#comment-17083878
 ] 

Terry Wang commented on FLINK-17152:


cc [~liyu2] I think this bug should be included in flink 1.10.1 release.

> FunctionDefinitionUtil generate wrong resultType and  acc type of 
> AggregateFunctionDefinition
> -
>
> Key: FLINK-17152
> URL: https://issues.apache.org/jira/browse/FLINK-17152
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.10.0
>Reporter: Terry Wang
>Priority: Critical
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> FunctionDefinitionUtil generate wrong resultType and  acc type of 
> AggregateFunctionDefinition. This bug will  lead to unexpect error such as: 
> Field types of query result and registered TableSink do not  match.
> Query schema: [v:RAW(IAccumulator, ?)]
> Sink schema: [v: STRING]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17152) FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition

2020-04-15 Thread Terry Wang (Jira)
Terry Wang created FLINK-17152:
--

 Summary: FunctionDefinitionUtil generate wrong resultType and  acc 
type of AggregateFunctionDefinition
 Key: FLINK-17152
 URL: https://issues.apache.org/jira/browse/FLINK-17152
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.10.0
Reporter: Terry Wang


FunctionDefinitionUtil generate wrong resultType and  acc type of 
AggregateFunctionDefinition. This bug will  lead to unexpect error such as: 
Field types of query result and registered TableSink do not  match.
Query schema: [v:RAW(IAccumulator, ?)]
Sink schema: [v: STRING]





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread

2020-04-02 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073653#comment-17073653
 ] 

Terry Wang commented on FLINK-16924:


hi [~jark] It's ok doing so.
It's okey for me to add description of not thread-safe in TableEnvironment.


> TableEnvironment#sqlUpdate throw NPE when called in async thread
> 
>
> Key: FLINK-16924
> URL: https://issues.apache.org/jira/browse/FLINK-16924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Terry Wang
>Priority: Major
> Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png
>
>
> Now in the latest code of flink master branch, we encountered unexpected npe 
> exception as the picture like attachments. 
> It seems that I can reproduce this problem  by creating tableEnv and doing 
> some operations in main thread and then calling sqlQuery or sqlUpdate in 
> another async thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread

2020-04-02 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073443#comment-17073443
 ] 

Terry Wang commented on FLINK-16924:


hi [~twalthr]
My use case is to explain the execute plan in async thread to avoid potential 
compile too much time or other unknown struck in main thread.

> TableEnvironment#sqlUpdate throw NPE when called in async thread
> 
>
> Key: FLINK-16924
> URL: https://issues.apache.org/jira/browse/FLINK-16924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Terry Wang
>Priority: Major
> Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png
>
>
> Now in the latest code of flink master branch, we encountered unexpected npe 
> exception as the picture like attachments. 
> It seems that I can reproduce this problem  by creating tableEnv and doing 
> some operations in main thread and then calling sqlQuery or sqlUpdate in 
> another async thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread

2020-04-01 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072801#comment-17072801
 ] 

Terry Wang commented on FLINK-16924:


cc [~danny0405]

> TableEnvironment#sqlUpdate throw NPE when called in async thread
> 
>
> Key: FLINK-16924
> URL: https://issues.apache.org/jira/browse/FLINK-16924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Terry Wang
>Priority: Major
> Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png
>
>
> Now in the latest code of flink master branch, we encountered unexpected npe 
> exception as the picture like attachments. 
> It seems that I can reproduce this problem  by creating tableEnv and doing 
> some operations in main thread and then calling sqlQuery or sqlUpdate in 
> another async thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread

2020-04-01 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16924:
---
Description: 
Now in the latest code of flink master branch, we encountered unexpected npe 
exception as the picture like attachments. 
It seems that I can reproduce this problem  by creating tableEnv and doing some 
operations in main thread and then calling sqlQuery or sqlUpdate in another 
async thread.

> TableEnvironment#sqlUpdate throw NPE when called in async thread
> 
>
> Key: FLINK-16924
> URL: https://issues.apache.org/jira/browse/FLINK-16924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Terry Wang
>Priority: Major
> Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png
>
>
> Now in the latest code of flink master branch, we encountered unexpected npe 
> exception as the picture like attachments. 
> It seems that I can reproduce this problem  by creating tableEnv and doing 
> some operations in main thread and then calling sqlQuery or sqlUpdate in 
> another async thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread

2020-04-01 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16924:
---
Attachment: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png

> TableEnvironment#sqlUpdate throw NPE when called in async thread
> 
>
> Key: FLINK-16924
> URL: https://issues.apache.org/jira/browse/FLINK-16924
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0
>Reporter: Terry Wang
>Priority: Major
> Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread

2020-04-01 Thread Terry Wang (Jira)
Terry Wang created FLINK-16924:
--

 Summary: TableEnvironment#sqlUpdate throw NPE when called in async 
thread
 Key: FLINK-16924
 URL: https://issues.apache.org/jira/browse/FLINK-16924
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.11.0
Reporter: Terry Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition

2020-03-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16506:
---
Description: 
We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
definition
`  x varchar comment 'Flink 社区', \n`

```
@Test
public void testCreateTableWithComment() {
conformance0 = FlinkSqlConformance.HIVE;
check("CREATE TABLE tbl1 (\n" +
"  a bigint comment 'test column comment 
AAA.',\n" +
"  h varchar, \n" +
"  x varchar comment 'Flink 社区', \n" +
"  g as 2 * (a + 1), \n" +
"  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
\n" +
"  b varchar,\n" +
"  proc as PROCTIME(), \n" +
"  PRIMARY KEY (a, b)\n" +
")\n" +
"comment 'test table comment ABC.'\n" +
"PARTITIONED BY (a, h)\n" +
"  with (\n" +
"'connector' = 'kafka', \n" +
"'kafka.topic' = 'log.test'\n" +
")\n",
"CREATE TABLE `TBL1` (\n" +
"  `A`  BIGINT  COMMENT 'test column comment 
AAA.',\n" +
"  `H`  VARCHAR,\n" +
"  `X` VARCHAR COMMENT 'Flink 社区', \n" +
"  `G` AS (2 * (`A` + 1)),\n" +
"  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
HH:mm:ss'),\n" +
"  `B`  VARCHAR,\n" +
"  `PROC` AS `PROCTIME`(),\n" +
"  PRIMARY KEY (`A`, `B`)\n" +
")\n" +
"COMMENT 'test table comment ABC.'\n" +
"PARTITIONED BY (`A`, `H`)\n" +
"WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'kafka.topic' = 'log.test'\n" +
")");
}
```

the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
\793e\533a' instead of our expection.

  was:
We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
definition
`  x varchar comment 'Flink 社区', \n`

```
@Test
public void testCreateTableWithComment() {
conformance0 = FlinkSqlConformance.HIVE;
check("CREATE TABLE tbl1 (\n" +
"  a bigint comment 'test column comment 
AAA.',\n" +
"  h varchar, \n" +
"  x varchar comment 'Flink 社区', \n" +
"  g as 2 * (a + 1), \n" +
"  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
\n" +
"  b varchar,\n" +
"  proc as PROCTIME(), \n" +
"  PRIMARY KEY (a, b)\n" +
")\n" +
"comment 'test table comment ABC.'\n" +
"PARTITIONED BY (a, h)\n" +
"  with (\n" +
"'connector' = 'kafka', \n" +
"'kafka.topic' = 'log.test'\n" +
")\n",
"CREATE TABLE `TBL1` (\n" +
"  `A`  BIGINT  COMMENT 'test column comment 
AAA.',\n" +
"  `H`  VARCHAR,\n" +
"  `X` VARCHAR COMMENT 'Flink 社区', \n" +
"  `G` AS (2 * (`A` + 1)),\n" +
"  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
HH:mm:ss'),\n" +
"  `B`  VARCHAR,\n" +
"  `PROC` AS `PROCTIME`(),\n" +
"  PRIMARY KEY (`A`, `B`)\n" +
")\n" +
"COMMENT 'test table comment ABC.'\n" +
"PARTITIONED BY (`A`, `H`)\n" +
"WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'kafka.topic' = 'log.test'\n" +
")");
}
```

the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
\793e\533a' instead of out expection.


> SqlCreateTable can not get the original text when there exists non-ascii char 
> in the col

[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition

2020-03-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16506:
---
Affects Version/s: 1.10.0

> SqlCreateTable can not get the original text when there exists non-ascii char 
> in the column definition
> --
>
> Key: FLINK-16506
> URL: https://issues.apache.org/jira/browse/FLINK-16506
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.10.0
>Reporter: Terry Wang
>Priority: Major
>
> We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
> definition
> `  x varchar comment 'Flink 社区', \n`
> ```
>   @Test
>   public void testCreateTableWithComment() {
>   conformance0 = FlinkSqlConformance.HIVE;
>   check("CREATE TABLE tbl1 (\n" +
>   "  a bigint comment 'test column comment 
> AAA.',\n" +
>   "  h varchar, \n" +
>   "  x varchar comment 'Flink 社区', \n" +
>   "  g as 2 * (a + 1), \n" +
>   "  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
> \n" +
>   "  b varchar,\n" +
>   "  proc as PROCTIME(), \n" +
>   "  PRIMARY KEY (a, b)\n" +
>   ")\n" +
>   "comment 'test table comment ABC.'\n" +
>   "PARTITIONED BY (a, h)\n" +
>   "  with (\n" +
>   "'connector' = 'kafka', \n" +
>   "'kafka.topic' = 'log.test'\n" +
>   ")\n",
>   "CREATE TABLE `TBL1` (\n" +
>   "  `A`  BIGINT  COMMENT 'test column comment 
> AAA.',\n" +
>   "  `H`  VARCHAR,\n" +
>   "  `X` VARCHAR COMMENT 'Flink 社区', \n" +
>   "  `G` AS (2 * (`A` + 1)),\n" +
>   "  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
> HH:mm:ss'),\n" +
>   "  `B`  VARCHAR,\n" +
>   "  `PROC` AS `PROCTIME`(),\n" +
>   "  PRIMARY KEY (`A`, `B`)\n" +
>   ")\n" +
>   "COMMENT 'test table comment ABC.'\n" +
>   "PARTITIONED BY (`A`, `H`)\n" +
>   "WITH (\n" +
>   "  'connector' = 'kafka',\n" +
>   "  'kafka.topic' = 'log.test'\n" +
>   ")");
>   }
> ```
> the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
> \793e\533a' instead of our expection.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition

2020-03-09 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17055527#comment-17055527
 ] 

Terry Wang commented on FLINK-16506:


cc [~danny0405] to have a look

> SqlCreateTable can not get the original text when there exists non-ascii char 
> in the column definition
> --
>
> Key: FLINK-16506
> URL: https://issues.apache.org/jira/browse/FLINK-16506
> Project: Flink
>  Issue Type: Bug
>Reporter: Terry Wang
>Priority: Major
>
> We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
> definition
> `  x varchar comment 'Flink 社区', \n`
> ```
>   @Test
>   public void testCreateTableWithComment() {
>   conformance0 = FlinkSqlConformance.HIVE;
>   check("CREATE TABLE tbl1 (\n" +
>   "  a bigint comment 'test column comment 
> AAA.',\n" +
>   "  h varchar, \n" +
>   "  x varchar comment 'Flink 社区', \n" +
>   "  g as 2 * (a + 1), \n" +
>   "  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
> \n" +
>   "  b varchar,\n" +
>   "  proc as PROCTIME(), \n" +
>   "  PRIMARY KEY (a, b)\n" +
>   ")\n" +
>   "comment 'test table comment ABC.'\n" +
>   "PARTITIONED BY (a, h)\n" +
>   "  with (\n" +
>   "'connector' = 'kafka', \n" +
>   "'kafka.topic' = 'log.test'\n" +
>   ")\n",
>   "CREATE TABLE `TBL1` (\n" +
>   "  `A`  BIGINT  COMMENT 'test column comment 
> AAA.',\n" +
>   "  `H`  VARCHAR,\n" +
>   "  `X` VARCHAR COMMENT 'Flink 社区', \n" +
>   "  `G` AS (2 * (`A` + 1)),\n" +
>   "  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
> HH:mm:ss'),\n" +
>   "  `B`  VARCHAR,\n" +
>   "  `PROC` AS `PROCTIME`(),\n" +
>   "  PRIMARY KEY (`A`, `B`)\n" +
>   ")\n" +
>   "COMMENT 'test table comment ABC.'\n" +
>   "PARTITIONED BY (`A`, `H`)\n" +
>   "WITH (\n" +
>   "  'connector' = 'kafka',\n" +
>   "  'kafka.topic' = 'log.test'\n" +
>   ")");
>   }
> ```
> the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
> \793e\533a' instead of out expection.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii code in the column definition

2020-03-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16506:
---
Description: 
We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
definition
`  x varchar comment 'Flink 社区', \n`

```
@Test
public void testCreateTableWithComment() {
conformance0 = FlinkSqlConformance.HIVE;
check("CREATE TABLE tbl1 (\n" +
"  a bigint comment 'test column comment 
AAA.',\n" +
"  h varchar, \n" +
"  x varchar comment 'Flink 社区', \n" +
"  g as 2 * (a + 1), \n" +
"  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
\n" +
"  b varchar,\n" +
"  proc as PROCTIME(), \n" +
"  PRIMARY KEY (a, b)\n" +
")\n" +
"comment 'test table comment ABC.'\n" +
"PARTITIONED BY (a, h)\n" +
"  with (\n" +
"'connector' = 'kafka', \n" +
"'kafka.topic' = 'log.test'\n" +
")\n",
"CREATE TABLE `TBL1` (\n" +
"  `A`  BIGINT  COMMENT 'test column comment 
AAA.',\n" +
"  `H`  VARCHAR,\n" +
"  `X` VARCHAR COMMENT 'Flink 社区', \n" +
"  `G` AS (2 * (`A` + 1)),\n" +
"  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
HH:mm:ss'),\n" +
"  `B`  VARCHAR,\n" +
"  `PROC` AS `PROCTIME`(),\n" +
"  PRIMARY KEY (`A`, `B`)\n" +
")\n" +
"COMMENT 'test table comment ABC.'\n" +
"PARTITIONED BY (`A`, `H`)\n" +
"WITH (\n" +
"  'connector' = 'kafka',\n" +
"  'kafka.topic' = 'log.test'\n" +
")");
}
```

the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
\793e\533a' instead of out expection.

> SqlCreateTable can not get the original text when there exists non-ascii code 
> in the column definition
> --
>
> Key: FLINK-16506
> URL: https://issues.apache.org/jira/browse/FLINK-16506
> Project: Flink
>  Issue Type: Bug
>Reporter: Terry Wang
>Priority: Major
>
> We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
> definition
> `  x varchar comment 'Flink 社区', \n`
> ```
>   @Test
>   public void testCreateTableWithComment() {
>   conformance0 = FlinkSqlConformance.HIVE;
>   check("CREATE TABLE tbl1 (\n" +
>   "  a bigint comment 'test column comment 
> AAA.',\n" +
>   "  h varchar, \n" +
>   "  x varchar comment 'Flink 社区', \n" +
>   "  g as 2 * (a + 1), \n" +
>   "  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
> \n" +
>   "  b varchar,\n" +
>   "  proc as PROCTIME(), \n" +
>   "  PRIMARY KEY (a, b)\n" +
>   ")\n" +
>   "comment 'test table comment ABC.'\n" +
>   "PARTITIONED BY (a, h)\n" +
>   "  with (\n" +
>   "'connector' = 'kafka', \n" +
>   "'kafka.topic' = 'log.test'\n" +
>   ")\n",
>   "CREATE TABLE `TBL1` (\n" +
>   "  `A`  BIGINT  COMMENT 'test column comment 
> AAA.',\n" +
>   "  `H`  VARCHAR,\n" +
>   "  `X` VARCHAR COMMENT 'Flink 社区', \n" +
>   "  `G` AS (2 * (`A` + 1)),\n" +
>   "  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
> HH:mm:ss'),\n" +
>   "  `B`  VARCHAR,\n" +
>   "  `PROC` AS `PROCTIME`(),\n" +
>   "  PRIMARY KEY (`A`, `B`)\n" +
>   ")\n" +
>   "COMMENT 'test table comment ABC.'\n" +
>   "PARTITIONED BY

[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition

2020-03-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16506:
---
Summary: SqlCreateTable can not get the original text when there exists 
non-ascii char in the column definition  (was: SqlCreateTable can not get the 
original text when there exists non-ascii code in the column definition)

> SqlCreateTable can not get the original text when there exists non-ascii char 
> in the column definition
> --
>
> Key: FLINK-16506
> URL: https://issues.apache.org/jira/browse/FLINK-16506
> Project: Flink
>  Issue Type: Bug
>Reporter: Terry Wang
>Priority: Major
>
> We can reproduce this problem in FlinkSqlParserImplTest, add one more column 
> definition
> `  x varchar comment 'Flink 社区', \n`
> ```
>   @Test
>   public void testCreateTableWithComment() {
>   conformance0 = FlinkSqlConformance.HIVE;
>   check("CREATE TABLE tbl1 (\n" +
>   "  a bigint comment 'test column comment 
> AAA.',\n" +
>   "  h varchar, \n" +
>   "  x varchar comment 'Flink 社区', \n" +
>   "  g as 2 * (a + 1), \n" +
>   "  ts as toTimestamp(b, '-MM-dd HH:mm:ss'), 
> \n" +
>   "  b varchar,\n" +
>   "  proc as PROCTIME(), \n" +
>   "  PRIMARY KEY (a, b)\n" +
>   ")\n" +
>   "comment 'test table comment ABC.'\n" +
>   "PARTITIONED BY (a, h)\n" +
>   "  with (\n" +
>   "'connector' = 'kafka', \n" +
>   "'kafka.topic' = 'log.test'\n" +
>   ")\n",
>   "CREATE TABLE `TBL1` (\n" +
>   "  `A`  BIGINT  COMMENT 'test column comment 
> AAA.',\n" +
>   "  `H`  VARCHAR,\n" +
>   "  `X` VARCHAR COMMENT 'Flink 社区', \n" +
>   "  `G` AS (2 * (`A` + 1)),\n" +
>   "  `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd 
> HH:mm:ss'),\n" +
>   "  `B`  VARCHAR,\n" +
>   "  `PROC` AS `PROCTIME`(),\n" +
>   "  PRIMARY KEY (`A`, `B`)\n" +
>   ")\n" +
>   "COMMENT 'test table comment ABC.'\n" +
>   "PARTITIONED BY (`A`, `H`)\n" +
>   "WITH (\n" +
>   "  'connector' = 'kafka',\n" +
>   "  'kafka.topic' = 'log.test'\n" +
>   ")");
>   }
> ```
> the actual unparse of x column will be   ` X`  VARCHAR  COMMENT u&'Flink 
> \793e\533a' instead of out expection.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii code in the column definition

2020-03-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16506:
---
Summary: SqlCreateTable can not get the original text when there exists 
non-ascii code in the column definition  (was: Sql)

> SqlCreateTable can not get the original text when there exists non-ascii code 
> in the column definition
> --
>
> Key: FLINK-16506
> URL: https://issues.apache.org/jira/browse/FLINK-16506
> Project: Flink
>  Issue Type: Bug
>Reporter: Terry Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16506) Sql

2020-03-09 Thread Terry Wang (Jira)
Terry Wang created FLINK-16506:
--

 Summary: Sql
 Key: FLINK-16506
 URL: https://issues.apache.org/jira/browse/FLINK-16506
 Project: Flink
  Issue Type: Bug
Reporter: Terry Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16414) create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null

2020-03-03 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050923#comment-17050923
 ] 

Terry Wang commented on FLINK-16414:


cc [~bli] to confirm~

> create udaf/udtf function using sql casuing ValidationException: SQL 
> validation failed. null
> 
>
> Key: FLINK-16414
> URL: https://issues.apache.org/jira/browse/FLINK-16414
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.10.0
>Reporter: Terry Wang
>Priority: Critical
>
> When using TableEnvironment#sqlupdate to create a udaf or udtf function, 
> which doesn't override the getResultType() method, it's normal. But when 
> using this function in later insert sql,  some exception like following will 
> be throwed:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. null
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> The reason is in FunctionDefinitionUtil#createFunctionDefinition, we 
> shouldn't direct call t.getResultType or a.getAccumulatorType() or 
> a.getResultType() but using 
> UserDefinedFunctionHelper#getReturnTypeOfTableFunction
>  UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction 
> UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead.
> ```
>   if (udf instanceof ScalarFunction) {
>   return new ScalarFunctionDefinition(
>   name,
>   (ScalarFunction) udf
>   );
>   } else if (udf instanceof TableFunction) {
>   TableFunction t = (TableFunction) udf;
>   return new TableFunctionDefinition(
>   name,
>   t,
>   t.getResultType()
>   );
>   } else if (udf instanceof AggregateFunction) {
>   AggregateFunction a = (AggregateFunction) udf;
>   return new AggregateFunctionDefinition(
>   name,
>   a,
>   a.getAccumulatorType(),
>   a.getResultType()
>   );
>   } else if (udf instanceof TableAggregateFunction) {
>   TableAggregateFunction a = (TableAggregateFunction) udf;
>   return new TableAggregateFunctionDefinition(
>   name,
>   a,
>   a.getAccumulatorType(),
>   a.getResultType()
>   );
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16414) create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null

2020-03-03 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-16414:
---
Description: 
When using TableEnvironment#sqlupdate to create a udaf or udtf function, which 
doesn't override the getResultType() method, it's normal. But when using this 
function in later insert sql,  some exception like following will be throwed:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. null
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)

The reason is in FunctionDefinitionUtil#createFunctionDefinition, we shouldn't 
direct call t.getResultType or a.getAccumulatorType() or a.getResultType() but 
using UserDefinedFunctionHelper#getReturnTypeOfTableFunction
 UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction 
UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead.
```

if (udf instanceof ScalarFunction) {
return new ScalarFunctionDefinition(
name,
(ScalarFunction) udf
);
} else if (udf instanceof TableFunction) {
TableFunction t = (TableFunction) udf;
return new TableFunctionDefinition(
name,
t,
t.getResultType()
);
} else if (udf instanceof AggregateFunction) {
AggregateFunction a = (AggregateFunction) udf;

return new AggregateFunctionDefinition(
name,
a,
a.getAccumulatorType(),
a.getResultType()
);
} else if (udf instanceof TableAggregateFunction) {
TableAggregateFunction a = (TableAggregateFunction) udf;

return new TableAggregateFunctionDefinition(
name,
a,
a.getAccumulatorType(),
a.getResultType()
);
```




  was:
When using TableEnvironment.sqlupdate() to create a udaf or udtf function, if 
the function doesn't override the getResultType() method, it's normal. But when 
using this function in the insert sql,  some exception like following will be 
throwed:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. null
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)

The reason is in FunctionDefinitionUtil#createFunctionDefinition, we shouldn't 
direct call t.getResultType or a.getAccumulatorType() or a.getResultType() but 
using UserDefinedFunctionHelper#getReturnTypeOfTableFunction
 UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction 
UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead.
```

if (udf instanceof ScalarFunction) {
return new ScalarFunctionDefinition(
name,
(ScalarFunction) udf
);
} else if (udf instanceof TableFunction

[jira] [Created] (FLINK-16414) create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null

2020-03-03 Thread Terry Wang (Jira)
Terry Wang created FLINK-16414:
--

 Summary: create udaf/udtf function using sql casuing 
ValidationException: SQL validation failed. null
 Key: FLINK-16414
 URL: https://issues.apache.org/jira/browse/FLINK-16414
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: Terry Wang


When using TableEnvironment.sqlupdate() to create a udaf or udtf function, if 
the function doesn't override the getResultType() method, it's normal. But when 
using this function in the insert sql,  some exception like following will be 
throwed:

Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL 
validation failed. null
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)

The reason is in FunctionDefinitionUtil#createFunctionDefinition, we shouldn't 
direct call t.getResultType or a.getAccumulatorType() or a.getResultType() but 
using UserDefinedFunctionHelper#getReturnTypeOfTableFunction
 UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction 
UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead.
```

if (udf instanceof ScalarFunction) {
return new ScalarFunctionDefinition(
name,
(ScalarFunction) udf
);
} else if (udf instanceof TableFunction) {
TableFunction t = (TableFunction) udf;
return new TableFunctionDefinition(
name,
t,
t.getResultType()
);
} else if (udf instanceof AggregateFunction) {
AggregateFunction a = (AggregateFunction) udf;

return new AggregateFunctionDefinition(
name,
a,
a.getAccumulatorType(),
a.getResultType()
);
} else if (udf instanceof TableAggregateFunction) {
TableAggregateFunction a = (TableAggregateFunction) udf;

return new TableAggregateFunctionDefinition(
name,
a,
a.getAccumulatorType(),
a.getResultType()
);
```






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem

2020-02-03 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028767#comment-17028767
 ] 

Terry Wang commented on FLINK-15544:


hi [~chesnay] It works well after I bundle a newer http-core version in my jar. 
It's not critical to change the version of http client for there no other 
people have ever reported it.
It's ok for me to close this issue.

> Upgrade http-core version to avoid potential DeadLock problem
> -
>
> Key: FLINK-15544
> URL: https://issues.apache.org/jira/browse/FLINK-15544
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Terry Wang
>Priority: Major
>
> Due to the bug of http-core:4..46 (we current use) HTTPCORE-446, it may cause 
> the bug of deadLock, we should upgrade the version.
> Background:
> We have a custom connector whose parent project is flink-parent, which cause 
> the dependency of all connector sdk's http-client version and  http core 
> version become consistent with the version in DependencyManagement section. 
> It costs us a lot to debug the problem due to HTTPCORE-446 in our production 
> environment. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

2020-01-13 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014903#comment-17014903
 ] 

Terry Wang commented on FLINK-15552:


Hi, [~jark].  I'm sure about it. As for SQL CLI e2e tests, maybe there is kafka 
jar under /lib directory.

> SQL Client can not correctly create kafka table using --library to indicate a 
> kafka connector directory
> ---
>
> Key: FLINK-15552
> URL: https://issues.apache.org/jira/browse/FLINK-15552
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Table SQL / Runtime
>Reporter: Terry Wang
>Priority: Major
>
> How to Reproduce:
> first, I start a sql client and using `-l` to point to a kafka connector 
> directory.
> `
>  bin/sql-client.sh embedded -l /xx/connectors/kafka/
> `
> Then, I create a Kafka Table like following 
> `
> Flink SQL> CREATE TABLE MyUserTable (
> >   content String
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = 'universal',
> >   'connector.topic' = 'test',
> >   'connector.properties.zookeeper.connect' = 'localhost:2181',
> >   'connector.properties.bootstrap.servers' = 'localhost:9092',
> >   'connector.properties.group.id' = 'testGroup',
> >   'connector.startup-mode' = 'earliest-offset',
> >   'format.type' = 'csv'
> >  );
> [INFO] Table has been created.
> `
> Then I select from just created table and an exception been thrown: 
> `
> Flink SQL> select * from MyUserTable;
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
> suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> Reason: Required context properties mismatch.
> The matching candidates:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> Mismatched properties:
> 'connector.type' expects 'filesystem', but is 'kafka'
> The following properties are requested:
> connector.properties.bootstrap.servers=localhost:9092
> connector.properties.group.id=testGroup
> connector.properties.zookeeper.connect=localhost:2181
> connector.startup-mode=earliest-offset
> connector.topic=test
> connector.type=kafka
> connector.version=universal
> format.type=csv
> schema.0.data-type=VARCHAR(2147483647)
> schema.0.name=content
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> `
> Potential Reasons:
> Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
> CatalogTable to TableSource,  but when call `TableFactoryService.find` we 
> don't pass current classLoader to this method, the default loader will be 
> BootStrapClassLoader, which can not find our factory.
> I verified in my box, it's truly caused by this behavior.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

2020-01-10 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15552:
---
Description: 
How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to this method, the default loader will be 
BootStrapClassLoader, which can not find our factory.

I verified in my box, it's truely caused by this behavior.

  was:
How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to this method, the default loader will be 
BootStrapClassLoader, which can not find our factory.


> SQL Client can not correctly create kafka table using --library to indicate a 
> kafka connector directory
> ---
>
> Key: FLINK-15552
> URL: https://issues.apache.org/jira/browse/FLINK-15552
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Table SQL / Runtime
>Reporter: Terry Wang
>Priority: Major
>
> How to Reproduce:
> first, I start a sql client and using `-l` to point to a kafka connect

[jira] [Updated] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

2020-01-10 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15552:
---
Description: 
How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to this method, the default loader will be 
BootStrapClassLoader, which can not find our factory.

I verified in my box, it's truly caused by this behavior.

  was:
How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to this method, the default loader will be 
BootStrapClassLoader, which can not find our factory.

I verified in my box, it's truely caused by this behavior.


> SQL Client can not correctly create kafka table using --library to indicate a 
> kafka connector directory
> ---
>
> Key: FLINK-15552
> URL: https://issues.apache.org/jira/browse/FLINK-15552
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Table SQL / Runtime
>Reporter: Terry Wang
>Priority: Major
>
> How to Reproduce:
> first, I st

[jira] [Updated] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

2020-01-10 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15552:
---
Description: 
How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to this method, the default loader will be 
BootStrapClassLoader, which can not find our factory.

  was:
How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to the this method, the defualt loader will be 
BootStrapClassLoader, which can not find our factory.


> SQL Client can not correctly create kafka table using --library to indicate a 
> kafka connector directory
> ---
>
> Key: FLINK-15552
> URL: https://issues.apache.org/jira/browse/FLINK-15552
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client, Table SQL / Runtime
>Reporter: Terry Wang
>Priority: Major
>
> How to Reproduce:
> first, I start a sql client and using `-l` to point to a kafka connector 
> directory.
> `
>  bin/sql-client.sh embedded -l /x

[jira] [Created] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory

2020-01-10 Thread Terry Wang (Jira)
Terry Wang created FLINK-15552:
--

 Summary: SQL Client can not correctly create kafka table using 
--library to indicate a kafka connector directory
 Key: FLINK-15552
 URL: https://issues.apache.org/jira/browse/FLINK-15552
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client, Table SQL / Runtime
Reporter: Terry Wang


How to Reproduce:
first, I start a sql client and using `-l` to point to a kafka connector 
directory.

`
 bin/sql-client.sh embedded -l /xx/connectors/kafka/

`

Then, I create a Kafka Table like following 
`
Flink SQL> CREATE TABLE MyUserTable (
>   content String
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'csv'
>  );
[INFO] Table has been created.
`

Then I select from just created table and an exception been thrown: 

`
Flink SQL> select * from MyUserTable;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a 
suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testGroup
connector.properties.zookeeper.connect=localhost:2181
connector.startup-mode=earliest-offset
connector.topic=test
connector.type=kafka
connector.version=universal
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=content

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
`
Potential Reasons:
Now we use  `TableFactoryUtil#findAndCreateTableSource`  to convert a 
CatalogTable to TableSource,  but when call `TableFactoryService.find` we don't 
pass current classLoader to the this method, the defualt loader will be 
BootStrapClassLoader, which can not find our factory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem

2020-01-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15544:
---
Description: 
Due to the bug of http-core:4..46 (we current use) HTTPCORE-446, it may cause 
the bug of deadLock, we should upgrade the version.

Background:
We have a custom connector whose parent project is flink-parent, which cause 
the dependency of all connector sdk's http-client version and  http core 
version become consistent with the version in DependencyManagement section. It 
costs us a lot to debug the problem due to HTTPCORE-446 in our production 
environment. 

  was:
Due to the bug of http-core:4..46 (we current use) 
https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of 
deadLock, we should upgrade the version.

Background:
We have a custom connector whose parent project is flink-parent, which cause 
the dependency of all connector sdk's http-client version and  http core 
version become consistent with the version in DependencyManagement section. It 
costs us a lot to debug the problem due to HTTPCORE-446 in out production 
environment. 


> Upgrade http-core version to avoid potential DeadLock problem
> -
>
> Key: FLINK-15544
> URL: https://issues.apache.org/jira/browse/FLINK-15544
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Terry Wang
>Priority: Major
>
> Due to the bug of http-core:4..46 (we current use) HTTPCORE-446, it may cause 
> the bug of deadLock, we should upgrade the version.
> Background:
> We have a custom connector whose parent project is flink-parent, which cause 
> the dependency of all connector sdk's http-client version and  http core 
> version become consistent with the version in DependencyManagement section. 
> It costs us a lot to debug the problem due to HTTPCORE-446 in our production 
> environment. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem

2020-01-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15544:
---
Description: 
Due to the bug of http-core:4..46 (we current use) 
https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of 
deadLock, we should upgrade the version.

Background:
We have a custom connector whose parent project is flink-parent, which cause 
the dependency of all connector sdk's http-client version and  http core 
version become consistent with the version in DependencyManagement section. It 
costs us a lot to debug the problem due to HTTPCORE-446 in out production 
environment. 

  was:Due to the bug of http-core:4..46 (we current use) 
https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of 
deadLock, we should upgrade the version.


> Upgrade http-core version to avoid potential DeadLock problem
> -
>
> Key: FLINK-15544
> URL: https://issues.apache.org/jira/browse/FLINK-15544
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Terry Wang
>Priority: Major
>
> Due to the bug of http-core:4..46 (we current use) 
> https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of 
> deadLock, we should upgrade the version.
> Background:
> We have a custom connector whose parent project is flink-parent, which cause 
> the dependency of all connector sdk's http-client version and  http core 
> version become consistent with the version in DependencyManagement section. 
> It costs us a lot to debug the problem due to HTTPCORE-446 in out production 
> environment. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem

2020-01-09 Thread Terry Wang (Jira)
Terry Wang created FLINK-15544:
--

 Summary: Upgrade http-core version to avoid potential DeadLock 
problem
 Key: FLINK-15544
 URL: https://issues.apache.org/jira/browse/FLINK-15544
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Terry Wang


Due to the bug of http-core:4..46 (we current use) 
https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of 
deadLock, we should upgrade the version.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15429) read hive table null value of timestamp type will throw an npe

2019-12-27 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17004153#comment-17004153
 ] 

Terry Wang commented on FLINK-15429:


cc [~lirui]

> read hive table null value of timestamp type will throw an npe
> --
>
> Key: FLINK-15429
> URL: https://issues.apache.org/jira/browse/FLINK-15429
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.10.0
>Reporter: Terry Wang
>Priority: Major
> Fix For: 1.10.0
>
>
> When there is null value of timestamp type in hive table, will have exception 
> like following:
> Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord
>   at 
> org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:122)
>   at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>   at SinkConversion$1.processElement(Unknown Source)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
>   at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>   at 
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.table.catalog.hive.client.HiveShimV100.ensureSupportedFlinkTimestamp(HiveShimV100.java:386)
>   at 
> org.apache.flink.table.catalog.hive.client.HiveShimV100.toHiveTimestamp(HiveShimV100.java:357)
>   at 
> org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$b054b59b$1(HiveInspectors.java:216)
>   at 
> org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$7f882244$1(HiveInspectors.java:172)
>   at 
> org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.getConvertedRow(HiveOutputFormatFactory.java:190)
>   at 
> org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:206)
>   at 
> org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:178)
>   at 
> org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:52)
>   at 
> org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:120)
>   ... 19 more
> We should add null check in HiveShim100#ensureSupportedFlinkTimestamp and 
> return a prper value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15429) read hive table null value of timestamp type will throw an npe

2019-12-27 Thread Terry Wang (Jira)
Terry Wang created FLINK-15429:
--

 Summary: read hive table null value of timestamp type will throw 
an npe
 Key: FLINK-15429
 URL: https://issues.apache.org/jira/browse/FLINK-15429
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.10.0
Reporter: Terry Wang
 Fix For: 1.10.0


When there is null value of timestamp type in hive table, will have exception 
like following:


Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord
at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:122)
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SinkConversion$1.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.lang.NullPointerException
at 
org.apache.flink.table.catalog.hive.client.HiveShimV100.ensureSupportedFlinkTimestamp(HiveShimV100.java:386)
at 
org.apache.flink.table.catalog.hive.client.HiveShimV100.toHiveTimestamp(HiveShimV100.java:357)
at 
org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$b054b59b$1(HiveInspectors.java:216)
at 
org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$7f882244$1(HiveInspectors.java:172)
at 
org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.getConvertedRow(HiveOutputFormatFactory.java:190)
at 
org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:206)
at 
org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:178)
at 
org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:52)
at 
org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:120)
... 19 more



We should add null check in HiveShim100#ensureSupportedFlinkTimestamp and 
return a prper value.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15398) Correct catalog doc example mistake

2019-12-25 Thread Terry Wang (Jira)
Terry Wang created FLINK-15398:
--

 Summary: Correct catalog doc example mistake
 Key: FLINK-15398
 URL: https://issues.apache.org/jira/browse/FLINK-15398
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Terry Wang


https://ci.apache.org/projects/flink/flink-docs-master/dev/table/catalogs.html#how-to-create-and-register-flink-tables-to-catalog
Now we don't support `show tables` through TableEnvironemt.sqlQuery() method, 
we should correct it .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-15256) unable to drop table in HiveCatalogITCase

2019-12-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16996974#comment-16996974
 ] 

Terry Wang edited comment on FLINK-15256 at 12/16/19 5:17 AM:
--

I add `tableEnv.sqlUpdate("drop table myhive.`default`.tests2");`  in 
HiveCatalogITCase#testCsvTableViaSQL and run it both through intelliJ IDEA and 
`mvn clean install`, all works well. Can you update your code to the latest and 
test again?
cc [~phoenixjiangnan]


was (Author: terry1897):
I add `tableEnv.sqlUpdate("drop table myhive.`default`.tests2");`  in 
HiveCatalogITCase#testCsvTableViaSQL and run it both through intelliJ IDEA and 
`mvn clean install`, all works well. Can you update your code to the latest and 
test again?

> unable to drop table in HiveCatalogITCase
> -
>
> Key: FLINK-15256
> URL: https://issues.apache.org/jira/browse/FLINK-15256
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Terry Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {code:java}
> @Test
>   public void testCsvTableViaSQL() throws Exception {
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>   TableEnvironment tableEnv = TableEnvironment.create(settings);
>   tableEnv.registerCatalog("myhive", hiveCatalog);
>   tableEnv.useCatalog("myhive");
>   String path = 
> this.getClass().getResource("/csv/test.csv").getPath();
>   tableEnv.sqlUpdate("create table test2 (name String, age Int) 
> with (\n" +
>   "   'connector.type' = 'filesystem',\n" +
>   "   'connector.path' = 'file://" + path + "',\n" +
>   "   'format.type' = 'csv'\n" +
>   ")");
>   Table t = tableEnv.sqlQuery("SELECT * FROM 
> myhive.`default`.test2");
>   List result = TableUtils.collectToList(t);
>   // assert query result
>   assertEquals(
>   new HashSet<>(Arrays.asList(
>   Row.of("1", 1),
>   Row.of("2", 2),
>   Row.of("3", 3))),
>   new HashSet<>(result)
>   );
>   tableEnv.sqlUpdate("drop table myhive.`default`.tests2");
>   }
> {code}
> The last drop table statement reports error as:
> {code:java}
> org.apache.flink.table.api.ValidationException: Could not execute DropTable 
> in path `myhive`.`default`.`tests2`
>   at 
> org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:568)
>   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:543)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:519)
>   at 
> org.apache.flink.table.catalog.hive.HiveCatalogITCase.testCsvTableViaSQL(HiveCatalogITCase.java:123)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at 
> org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runTestMethod(FlinkStandaloneHiveRunner.java:169)
>   at 
> org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:154)
>   at 
> org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:92)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefore

[jira] [Commented] (FLINK-15256) unable to drop table in HiveCatalogITCase

2019-12-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16996974#comment-16996974
 ] 

Terry Wang commented on FLINK-15256:


I add `tableEnv.sqlUpdate("drop table myhive.`default`.tests2");`  in 
HiveCatalogITCase#testCsvTableViaSQL and run it both through intelliJ IDEA and 
`mvn clean install`, all works well. Can you update your code to the latest and 
test again?

> unable to drop table in HiveCatalogITCase
> -
>
> Key: FLINK-15256
> URL: https://issues.apache.org/jira/browse/FLINK-15256
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Reporter: Bowen Li
>Assignee: Terry Wang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.10.0, 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {code:java}
> @Test
>   public void testCsvTableViaSQL() throws Exception {
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
>   TableEnvironment tableEnv = TableEnvironment.create(settings);
>   tableEnv.registerCatalog("myhive", hiveCatalog);
>   tableEnv.useCatalog("myhive");
>   String path = 
> this.getClass().getResource("/csv/test.csv").getPath();
>   tableEnv.sqlUpdate("create table test2 (name String, age Int) 
> with (\n" +
>   "   'connector.type' = 'filesystem',\n" +
>   "   'connector.path' = 'file://" + path + "',\n" +
>   "   'format.type' = 'csv'\n" +
>   ")");
>   Table t = tableEnv.sqlQuery("SELECT * FROM 
> myhive.`default`.test2");
>   List result = TableUtils.collectToList(t);
>   // assert query result
>   assertEquals(
>   new HashSet<>(Arrays.asList(
>   Row.of("1", 1),
>   Row.of("2", 2),
>   Row.of("3", 3))),
>   new HashSet<>(result)
>   );
>   tableEnv.sqlUpdate("drop table myhive.`default`.tests2");
>   }
> {code}
> The last drop table statement reports error as:
> {code:java}
> org.apache.flink.table.api.ValidationException: Could not execute DropTable 
> in path `myhive`.`default`.`tests2`
>   at 
> org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:568)
>   at 
> org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:543)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:519)
>   at 
> org.apache.flink.table.catalog.hive.HiveCatalogITCase.testCsvTableViaSQL(HiveCatalogITCase.java:123)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at 
> org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runTestMethod(FlinkStandaloneHiveRunner.java:169)
>   at 
> org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:154)
>   at 
> org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:92)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(Extern

[jira] [Commented] (FLINK-15259) HiveInspector.toInspectors() should convert Flink constant to Hive constant

2019-12-15 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16996942#comment-16996942
 ] 

Terry Wang commented on FLINK-15259:


Why it is not a blocker? It'll cause hive function not work normally. cc 
[~lzljs3620320]too

> HiveInspector.toInspectors() should convert Flink constant to Hive constant 
> 
>
> Key: FLINK-15259
> URL: https://issues.apache.org/jira/browse/FLINK-15259
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Hive
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Bowen Li
>Assignee: Rui Li
>Priority: Major
> Fix For: 1.9.2, 1.10.0, 1.11.0
>
>
> repro test: 
> {code:java}
> public class HiveModuleITCase {
>   @Test
>   public void test() {
>   TableEnvironment tEnv = 
> HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
>   tEnv.unloadModule("core");
>   tEnv.loadModule("hive", new HiveModule("2.3.4"));
>   tEnv.sqlQuery("select concat('an', 'bn')");
>   }
> }
> {code}
> seems that currently HiveInspector.toInspectors() didn't convert Flink 
> constant to Hive constant before calling 
> hiveShim.getObjectInspectorForConstant
> I don't think it's a blocker



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15192) consider split 'SQL' page into multiple sub pages for better clarity

2019-12-13 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995463#comment-16995463
 ] 

Terry Wang commented on FLINK-15192:


I'd like to improve it. After https://issues.apache.org/jira/browse/FLINK-15190 
merged, I'll try to split this page into child pages to see the effect.

> consider split 'SQL' page into multiple sub pages for better clarity
> 
>
> Key: FLINK-15192
> URL: https://issues.apache.org/jira/browse/FLINK-15192
> Project: Flink
>  Issue Type: Task
>  Components: Documentation
>Reporter: Bowen Li
>Assignee: Terry Wang
>Priority: Major
> Fix For: 1.10.0
>
>
> with FLINK-15190, we are gonna add a bunch of ddl which makes the page too 
> long and not really readable.
> I suggest split "SQL" page into sub pages of "SQL DDL", "SQL DML", "SQL DQL", 
> and others if needed.
> assigned to Terry temporarily. 
> cc  [~jark] [~lzljs3620320] 
> As example, the SQL doc directory of Hive looks like below, which is a lot 
> better that Flink's current one
> {code:java}
> CHILD PAGES
> Pages
> LanguageManual
> LanguageManual Cli
> LanguageManual DDL
> LanguageManual DML
> LanguageManual Select
> LanguageManual Joins
> LanguageManual LateralView
> LanguageManual Union
> LanguageManual SubQueries
> LanguageManual Sampling
> LanguageManual Explain
> LanguageManual VirtualColumns
> Configuration Properties
> LanguageManual ImportExport
> LanguageManual Authorization
> LanguageManual Types
> Literals
> LanguageManual VariableSubstitution
> LanguageManual ORC
> LanguageManual WindowingAndAnalytics
> LanguageManual Indexing
> LanguageManual JoinOptimization
> LanguageManual LZO
> LanguageManual Commands
> Parquet
> Enhanced Aggregation, Cube, Grouping and Rollup
> FileFormats
> Hive HPL/SQL
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15242) Add doc to introduce ddls or dmls supported by sql cli

2019-12-13 Thread Terry Wang (Jira)
Terry Wang created FLINK-15242:
--

 Summary: Add doc to introduce ddls or dmls supported by sql cli
 Key: FLINK-15242
 URL: https://issues.apache.org/jira/browse/FLINK-15242
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.10.0
Reporter: Terry Wang
 Fix For: 1.10.0


Now in the document of sql client
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html,
 there isn't a part to introduce the ddls/dmls in a whole story. We should 
complete it before the 1.10 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13437) Add Hive SQL E2E test

2019-12-10 Thread Terry Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16993142#comment-16993142
 ] 

Terry Wang commented on FLINK-13437:


hi [~liyu] I didn't have the permission to change the status, can you help 
change the status?

> Add Hive SQL E2E test
> -
>
> Key: FLINK-13437
> URL: https://issues.apache.org/jira/browse/FLINK-13437
> Project: Flink
>  Issue Type: Test
>  Components: Connectors / Hive, Tests
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: Terry Wang
>Priority: Major
> Fix For: 1.10.0
>
>
> We should add an E2E test for the Hive integration: List all tables and read 
> some metadata, read from an existing table, register a new table in Hive, use 
> a registered function, write to an existing table, write to a new table.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15148) Add doc for create/drop/alter database ddl

2019-12-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15148:
---
Component/s: Documentation

> Add doc for create/drop/alter database ddl
> --
>
> Key: FLINK-15148
> URL: https://issues.apache.org/jira/browse/FLINK-15148
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Terry Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-15147) Add doc for alter table set properties and rename table ddl

2019-12-09 Thread Terry Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Wang updated FLINK-15147:
---
Summary: Add doc for alter table set properties and rename table ddl  (was: 
Add doc for alter table set properties and rename table)

> Add doc for alter table set properties and rename table ddl
> ---
>
> Key: FLINK-15147
> URL: https://issues.apache.org/jira/browse/FLINK-15147
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation
>Reporter: Terry Wang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15148) Add doc for create/drop/alter database ddl

2019-12-09 Thread Terry Wang (Jira)
Terry Wang created FLINK-15148:
--

 Summary: Add doc for create/drop/alter database ddl
 Key: FLINK-15148
 URL: https://issues.apache.org/jira/browse/FLINK-15148
 Project: Flink
  Issue Type: Sub-task
Reporter: Terry Wang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >