[jira] [Commented] (FLINK-28569) SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty otherwise wrong result maybe produced
[ https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615706#comment-17615706 ] Terry Wang commented on FLINK-28569: But for the query which don't have upsert key and with non-deterministic values, the potential wrong result seems can not be avoid? Should we disable such plan or give a warning during plan generation? > SinkUpsertMaterializer should be aware of the input upsertKey if it is not > empty otherwise wrong result maybe produced > -- > > Key: FLINK-28569 > URL: https://issues.apache.org/jira/browse/FLINK-28569 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5, 1.15.2 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.17.0 > > > Currently SinkUpsertMaterializer only update row by comparing the complete > row in anycase, but this may cause wrong result if input has upsertKey and > also non-deterministic column values, see such a case: > {code:java} > @Test > public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() { > tEnv.createTemporaryFunction( > "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf()); > String cdcDdl = > "CREATE TABLE users (\n" > + " user_id STRING,\n" > + " user_name STRING,\n" > + " email STRING,\n" > + " balance DECIMAL(18,2),\n" > + " primary key (user_id) not enforced\n" > + ") WITH (\n" > + " 'connector' = 'values',\n" > + " 'changelog-mode' = 'I,UA,UB,D'\n" > + ")"; > String sinkTableDdl = > "CREATE TABLE sink (\n" > + " user_id STRING,\n" > + " user_name STRING,\n" > + " email STRING,\n" > + " balance DECIMAL(18,2),\n" > + " PRIMARY KEY(email) NOT ENFORCED\n" > + ") WITH(\n" > + " 'connector' = 'values',\n" > + " 'sink-insert-only' = 'false'\n" > + ")"; > tEnv.executeSql(cdcDdl); > tEnv.executeSql(sinkTableDdl); > util.verifyJsonPlan( > "insert into sink select user_id, ndFunc(user_name), email, balance from > users"); > } > {code} > for original cdc source records: > {code} > +I[user1, Tom, t...@gmail.com, 10.02], > -D[user1, Tom, t...@gmail.com, 10.02], > {code} > the above query cannot correctly delete the former insertion row because of > the non-deterministic column value 'ndFunc(user_name)' > this canbe solved by letting the SinkUpsertMaterializer be aware of input > upsertKey and update by it -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28569) SinkUpsertMaterializer should be aware of the input upsertKey if it is not empty otherwise wrong result maybe produced
[ https://issues.apache.org/jira/browse/FLINK-28569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615681#comment-17615681 ] Terry Wang commented on FLINK-28569: +1 to backport this fix into 1.14 and 1.15 > SinkUpsertMaterializer should be aware of the input upsertKey if it is not > empty otherwise wrong result maybe produced > -- > > Key: FLINK-28569 > URL: https://issues.apache.org/jira/browse/FLINK-28569 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.14.5, 1.15.2 >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 1.16.0, 1.17.0 > > > Currently SinkUpsertMaterializer only update row by comparing the complete > row in anycase, but this may cause wrong result if input has upsertKey and > also non-deterministic column values, see such a case: > {code:java} > @Test > public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() { > tEnv.createTemporaryFunction( > "ndFunc", new JavaUserDefinedScalarFunctions.NonDeterministicUdf()); > String cdcDdl = > "CREATE TABLE users (\n" > + " user_id STRING,\n" > + " user_name STRING,\n" > + " email STRING,\n" > + " balance DECIMAL(18,2),\n" > + " primary key (user_id) not enforced\n" > + ") WITH (\n" > + " 'connector' = 'values',\n" > + " 'changelog-mode' = 'I,UA,UB,D'\n" > + ")"; > String sinkTableDdl = > "CREATE TABLE sink (\n" > + " user_id STRING,\n" > + " user_name STRING,\n" > + " email STRING,\n" > + " balance DECIMAL(18,2),\n" > + " PRIMARY KEY(email) NOT ENFORCED\n" > + ") WITH(\n" > + " 'connector' = 'values',\n" > + " 'sink-insert-only' = 'false'\n" > + ")"; > tEnv.executeSql(cdcDdl); > tEnv.executeSql(sinkTableDdl); > util.verifyJsonPlan( > "insert into sink select user_id, ndFunc(user_name), email, balance from > users"); > } > {code} > for original cdc source records: > {code} > +I[user1, Tom, t...@gmail.com, 10.02], > -D[user1, Tom, t...@gmail.com, 10.02], > {code} > the above query cannot correctly delete the former insertion row because of > the non-deterministic column value 'ndFunc(user_name)' > this canbe solved by letting the SinkUpsertMaterializer be aware of input > upsertKey and update by it -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-26334) When timestamp - offset + windowSize < 0, elements cannot be assigned to the correct window
[ https://issues.apache.org/jira/browse/FLINK-26334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501258#comment-17501258 ] Terry Wang commented on FLINK-26334: Good insight about the description of problem and cause analysis.(y) > When timestamp - offset + windowSize < 0, elements cannot be assigned to the > correct window > --- > > Key: FLINK-26334 > URL: https://issues.apache.org/jira/browse/FLINK-26334 > Project: Flink > Issue Type: Bug > Components: API / DataStream >Affects Versions: 1.15.0, 1.14.3 > Environment: flink version 1.14.3 >Reporter: realdengziqi >Assignee: realdengziqi >Priority: Critical > Attachments: image-2022-03-04-11-28-26-616.png, > image-2022-03-04-11-37-10-035.png > > Original Estimate: 3h > Remaining Estimate: 3h > > h2. issue > Hello! > When we were studying the flink source code, we found that there was > a problem with its algorithm for calculating the window start time. When > _timestamp - offset + windowSize < 0_ , the element will be incorrectly > allocated to a window with a WindowSize larger than its own timestamp. > The problem is in > _org.apache.flink.streaming.api.windowing.windows.TimeWindow_ > {code:java} > public static long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > return timestamp - (timestamp - offset + windowSize) % windowSize; > } {code} > _!image-2022-03-04-11-28-26-616.png|width=710,height=251!_ > We believe that this violates the constraints between time and > window. *That is, an element should fall within a window whose start time is > less than its own timestamp and whose end time is greater than its own > timestamp.* However, the current situation is when {_}timestamp - offset + > windowSize < 0{_}, *the element falls into a future time window.* > *You can reproduce the bug with the code at the end of the post.* > h2. Solution > In fact, the original algorithm is no problem in python, the key to > this problem is the processing of the remainder operation by the programming > language. > We finally think that it should be modified to the following > algorithm. > {code:java} > public static long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > return timestamp > - (timestamp - offset) % windowSize > - (windowSize & (timestamp - offset) >> 63); > } {code} > _windowSize & (timestamp - offset) >> 63_ The function of this > formula is to subtract windowSize from the overall operation result when > {_}timestamp - offset<0{_}, otherwise do nothing. This way we can handle both > positive and negative timestamps. > Finally, the element can be assigned to the correct window. > !image-2022-03-04-11-37-10-035.png|width=712,height=284! > This code can pass current unit tests. > h2. getWindowStartWithOffset methods in other packages > I think that there should be many places in > {_}getWindowStartWithOffset{_}. We searched for this method in the project > and found that the problem of negative timestamps is handled in _flink.table._ > Below is their source code. > > _{{org.apache.flink.table.runtime.operators.window.grouping.WindowsGrouping}}_ > {code:java} > private long getWindowStartWithOffset(long timestamp, long offset, long > windowSize) { > long remainder = (timestamp - offset) % windowSize; > // handle both positive and negative cases > if (remainder < 0) { > return timestamp - (remainder + windowSize); > } else { > return timestamp - remainder; > } > } {code} > h2. Can we make a pull request? > If the community deems it necessary to revise it, hopefully this task > can be handed over to us. Our members are all students who have just > graduated from school, and it is a great encouragement for us to contribute > code to flink. > Thank you so much! > From Deng Ziqi & Lin Wanni & Guo Yuanfang > > > > h2. reproduce > {code:java} > /* output > WindowStart: -15000ExactSize:1(a,-17000) > WindowStart: -1ExactSize:1(b,-12000) > WindowStart: -5000 ExactSize:2(c,-7000) > WindowStart: -5000 ExactSize:2(d,-2000) > WindowStart: 0 ExactSize:1(e,3000) > WindowStart: 5000 ExactSize:1(f,8000) > WindowStart: 1 ExactSize:1(g,13000) > WindowStart: 15000 ExactSize:1(h,18000) > */ > public class Example { > public static void main(String[] args) throws Exception { > final TimeZone timeZone = TimeZone.getTimeZone("GTM+0"); > TimeZone.setDefault(timeZone); > StreamExecutionEnvironment env = > StreamExecutionEnviro
[jira] [Updated] (FLINK-23289) BinarySection should add null check in constructor method
[ https://issues.apache.org/jira/browse/FLINK-23289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-23289: --- Summary: BinarySection should add null check in constructor method (was: BinarySection should null check in constructor method) > BinarySection should add null check in constructor method > - > > Key: FLINK-23289 > URL: https://issues.apache.org/jira/browse/FLINK-23289 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Terry Wang >Priority: Major > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411) > at > org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132) > at > org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118) > at > org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23289) BinarySection should null check in constructor method
[ https://issues.apache.org/jira/browse/FLINK-23289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-23289: --- Component/s: Table SQL / Runtime > BinarySection should null check in constructor method > - > > Key: FLINK-23289 > URL: https://issues.apache.org/jira/browse/FLINK-23289 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime >Reporter: Terry Wang >Priority: Major > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411) > at > org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132) > at > org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118) > at > org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23289) BinarySection should null check in contusctor method
Terry Wang created FLINK-23289: -- Summary: BinarySection should null check in contusctor method Key: FLINK-23289 URL: https://issues.apache.org/jira/browse/FLINK-23289 Project: Flink Issue Type: Improvement Reporter: Terry Wang {code:java} Caused by: java.lang.NullPointerException at org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411) at org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132) at org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118) at org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59) at org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152) at org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-23289) BinarySection should null check in constructor method
[ https://issues.apache.org/jira/browse/FLINK-23289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-23289: --- Summary: BinarySection should null check in constructor method (was: BinarySection should null check in contusctor method) > BinarySection should null check in constructor method > - > > Key: FLINK-23289 > URL: https://issues.apache.org/jira/browse/FLINK-23289 > Project: Flink > Issue Type: Improvement >Reporter: Terry Wang >Priority: Major > > {code:java} > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.data.binary.BinarySegmentUtils.inFirstSegment(BinarySegmentUtils.java:411) > at > org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:132) > at > org.apache.flink.table.data.binary.BinarySegmentUtils.copyToBytes(BinarySegmentUtils.java:118) > at > org.apache.flink.table.data.binary.BinaryStringData.copy(BinaryStringData.java:360) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:59) > at > org.apache.flink.table.runtime.typeutils.StringDataSerializer.copy(StringDataSerializer.java:37) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:128) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:86) > at > org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:47) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:152) > at > org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner$CalcCollectionCollector.collect(AsyncLookupJoinWithCalcRunner.java:142) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-18516) Improve error message for rank function in streaming mode
[ https://issues.apache.org/jira/browse/FLINK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang closed FLINK-18516. -- Resolution: Works for Me > Improve error message for rank function in streaming mode > - > > Key: FLINK-18516 > URL: https://issues.apache.org/jira/browse/FLINK-18516 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Rui Li >Priority: Minor > Labels: auto-deprioritized-major > > The following query currently fails with NPE: > {code} > create table foo (x int,y string,p as proctime()) with ...; > select x,y,row_number() over (partition by x order by p) from foo; > {code} > which can be difficult for users to figure out the reason of the failure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-18516) Improve error message for rank function in streaming mode
[ https://issues.apache.org/jira/browse/FLINK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357087#comment-17357087 ] Terry Wang edited comment on FLINK-18516 at 6/4/21, 6:13 AM: - hi [~lirui] What's your version of flink reporting this npe? I can not reproduce your error in flink master code. Maybe the bug has been fixed. was (Author: terry1897): hi [~lirui] What's your flink version reporting this npe? I can not reproduce your error in flink master code. Maybe the bug has been fixed. > Improve error message for rank function in streaming mode > - > > Key: FLINK-18516 > URL: https://issues.apache.org/jira/browse/FLINK-18516 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Rui Li >Priority: Minor > Labels: auto-deprioritized-major > > The following query currently fails with NPE: > {code} > create table foo (x int,y string,p as proctime()) with ...; > select x,y,row_number() over (partition by x order by p) from foo; > {code} > which can be difficult for users to figure out the reason of the failure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18516) Improve error message for rank function in streaming mode
[ https://issues.apache.org/jira/browse/FLINK-18516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17357087#comment-17357087 ] Terry Wang commented on FLINK-18516: hi [~lirui] What's your flink version reporting this npe? I can not reproduce your error in flink master code. Maybe the bug has been fixed. > Improve error message for rank function in streaming mode > - > > Key: FLINK-18516 > URL: https://issues.apache.org/jira/browse/FLINK-18516 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Reporter: Rui Li >Priority: Minor > Labels: auto-deprioritized-major > > The following query currently fails with NPE: > {code} > create table foo (x int,y string,p as proctime()) with ...; > select x,y,row_number() over (partition by x order by p) from foo; > {code} > which can be difficult for users to figure out the reason of the failure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks
[ https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang closed FLINK-15775. -- Fix Version/s: 1.11.0 Release Note: Has fixed after flink 1.11 Resolution: Invalid > SourceFunctions are instantiated twice when pulled on from 2 Sinks > -- > > Key: FLINK-15775 > URL: https://issues.apache.org/jira/browse/FLINK-15775 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.1, 1.10.0 >Reporter: Benoît Paris >Priority: Minor > Labels: auto-deprioritized-major > Fix For: 1.11.0 > > Attachments: flink-test-duplicated-sources.zip > > > When pulled on by two sinks, the SourceFunctions of a TableSource will get > instantiated twice; (and subsequently opened by the parallelism number, which > is expected behavior): > The following will instantiate the FooTableSource's SourceFunction once (OK > behavior, but not the processing we want): > > {code:java} > tEnv.registerTableSource("foo_table", new FooTableSource()); > Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); > tEnv.registerTableSink("syso_sink_0", new SysoSink()); > out0.insertInto("syso_sink_0"); > {code} > > This will instantiate the FooTableSource's SourceFunction twice (Not OK, as > we're missing half the inputs in each SysoSink): > > {code:java} > tEnv.registerTableSource("foo_table", new FooTableSource()); > Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); > Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1"); > tEnv.registerTableSink("syso_sink_0", new SysoSink()); > tEnv.registerTableSink("syso_sink_1", new SysoSink()); > out0.insertInto("syso_sink_0"); > out1.insertInto("syso_sink_1"); > {code} > > This might not be a problem for Kafka's SourceFunctions, as we can always > reread from a log; but it is a data loss problem when the source data can't > be reproduced. > Actually, this might be me not understanding the API. Is there a way to make > the runtime read from the same opened SourceFunctions? > Attached is Java code that logs the faulty opening of the SourceFunctions, > pom.xml, and logical execution plans for the duplicated case, and the > workaround. > > > Workaround: make a conversion to an appendStream. Somehow this makes the > planner think it has to put a materialization barrier after the Source and > read from that: > > {code:java} > tEnv.registerTableSource("foo_table_source", new FooTableSource()); > Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source"); > Table appendingSourceTable = tEnv.fromDataStream( > tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new > TypeInformation[]{Types.LONG()})) > ); > tEnv.registerTable("foo_table", appendingSourceTable);{code} > > > Best Regards, > Ben -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks
[ https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351040#comment-17351040 ] Terry Wang edited comment on FLINK-15775 at 5/25/21, 12:50 PM: --- Hi, [~BenoitParis]~ FYI, TableEnvironment has offered a way to execute multiple insert into one pipeline since 1.11, you may refer to the doc https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/insert/ was (Author: terry1897): Hi, [~BenoitParis]~ > SourceFunctions are instantiated twice when pulled on from 2 Sinks > -- > > Key: FLINK-15775 > URL: https://issues.apache.org/jira/browse/FLINK-15775 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.1, 1.10.0 >Reporter: Benoît Paris >Priority: Minor > Labels: auto-deprioritized-major > Attachments: flink-test-duplicated-sources.zip > > > When pulled on by two sinks, the SourceFunctions of a TableSource will get > instantiated twice; (and subsequently opened by the parallelism number, which > is expected behavior): > The following will instantiate the FooTableSource's SourceFunction once (OK > behavior, but not the processing we want): > > {code:java} > tEnv.registerTableSource("foo_table", new FooTableSource()); > Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); > tEnv.registerTableSink("syso_sink_0", new SysoSink()); > out0.insertInto("syso_sink_0"); > {code} > > This will instantiate the FooTableSource's SourceFunction twice (Not OK, as > we're missing half the inputs in each SysoSink): > > {code:java} > tEnv.registerTableSource("foo_table", new FooTableSource()); > Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); > Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1"); > tEnv.registerTableSink("syso_sink_0", new SysoSink()); > tEnv.registerTableSink("syso_sink_1", new SysoSink()); > out0.insertInto("syso_sink_0"); > out1.insertInto("syso_sink_1"); > {code} > > This might not be a problem for Kafka's SourceFunctions, as we can always > reread from a log; but it is a data loss problem when the source data can't > be reproduced. > Actually, this might be me not understanding the API. Is there a way to make > the runtime read from the same opened SourceFunctions? > Attached is Java code that logs the faulty opening of the SourceFunctions, > pom.xml, and logical execution plans for the duplicated case, and the > workaround. > > > Workaround: make a conversion to an appendStream. Somehow this makes the > planner think it has to put a materialization barrier after the Source and > read from that: > > {code:java} > tEnv.registerTableSource("foo_table_source", new FooTableSource()); > Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source"); > Table appendingSourceTable = tEnv.fromDataStream( > tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new > TypeInformation[]{Types.LONG()})) > ); > tEnv.registerTable("foo_table", appendingSourceTable);{code} > > > Best Regards, > Ben -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15775) SourceFunctions are instantiated twice when pulled on from 2 Sinks
[ https://issues.apache.org/jira/browse/FLINK-15775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17351040#comment-17351040 ] Terry Wang commented on FLINK-15775: Hi, [~BenoitParis]~ > SourceFunctions are instantiated twice when pulled on from 2 Sinks > -- > > Key: FLINK-15775 > URL: https://issues.apache.org/jira/browse/FLINK-15775 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.9.1, 1.10.0 >Reporter: Benoît Paris >Priority: Minor > Labels: auto-deprioritized-major > Attachments: flink-test-duplicated-sources.zip > > > When pulled on by two sinks, the SourceFunctions of a TableSource will get > instantiated twice; (and subsequently opened by the parallelism number, which > is expected behavior): > The following will instantiate the FooTableSource's SourceFunction once (OK > behavior, but not the processing we want): > > {code:java} > tEnv.registerTableSource("foo_table", new FooTableSource()); > Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); > tEnv.registerTableSink("syso_sink_0", new SysoSink()); > out0.insertInto("syso_sink_0"); > {code} > > This will instantiate the FooTableSource's SourceFunction twice (Not OK, as > we're missing half the inputs in each SysoSink): > > {code:java} > tEnv.registerTableSource("foo_table", new FooTableSource()); > Table out0 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 0"); > Table out1 = tEnv.sqlQuery("SELECT * FROM foo_table WHERE field_1 = 1"); > tEnv.registerTableSink("syso_sink_0", new SysoSink()); > tEnv.registerTableSink("syso_sink_1", new SysoSink()); > out0.insertInto("syso_sink_0"); > out1.insertInto("syso_sink_1"); > {code} > > This might not be a problem for Kafka's SourceFunctions, as we can always > reread from a log; but it is a data loss problem when the source data can't > be reproduced. > Actually, this might be me not understanding the API. Is there a way to make > the runtime read from the same opened SourceFunctions? > Attached is Java code that logs the faulty opening of the SourceFunctions, > pom.xml, and logical execution plans for the duplicated case, and the > workaround. > > > Workaround: make a conversion to an appendStream. Somehow this makes the > planner think it has to put a materialization barrier after the Source and > read from that: > > {code:java} > tEnv.registerTableSource("foo_table_source", new FooTableSource()); > Table sourceTable = tEnv.sqlQuery("SELECT * FROM foo_table_source"); > Table appendingSourceTable = tEnv.fromDataStream( > tEnv.toAppendStream(sourceTable, Types.ROW(new String[]{"field_1"}, new > TypeInformation[]{Types.LONG()})) > ); > tEnv.registerTable("foo_table", appendingSourceTable);{code} > > > Best Regards, > Ben -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22654) SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks
[ https://issues.apache.org/jira/browse/FLINK-22654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344282#comment-17344282 ] Terry Wang commented on FLINK-22654: I reproduce this problem in my local environment, and it's a bug of `SqlCreateTable#unparse()`. I will open a pr to fix it soon. > SqlCreateTable toString()/unparse() lose CONSTRAINTS and watermarks > - > > Key: FLINK-22654 > URL: https://issues.apache.org/jira/browse/FLINK-22654 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.13.0 >Reporter: youxianq >Priority: Minor > > create a SqlCreateTable using like clause and then toString() or unparse() > will lose watermark > if no column. > {code:java} > public static SqlParser getSqlParser(String sql) { > SourceStringReader sqlReader = new SourceStringReader(sql); > return SqlParser.create(sqlReader, > SqlParser.configBuilder() > .setParserFactory(FlinkSqlParserImpl.FACTORY) > .setLex(Lex.JAVA) > .setIdentifierMaxLength(256) > .setConformance(FlinkSqlConformance.DEFAULT) > .build()); > } > public static void main(String[] args) throws Exception { > SqlParser sqlParser = getSqlParser("" + > "create TEMPORARY table t_order_course (\n" + > " WATERMARK FOR last_update_time AS last_update_time - INTERVAL > '5' SECOND\n" + > ") with (\n" + > " 'scan.startup.mode' = 'specific-offsets',\n" + > " 'scan.startup.specific-offsets' = > 'partition:0,offset:1169129'\n" + > ") like cdc.`qq_data(sh-backend-tst:3306)`.t_order_course (\n" + > " OVERWRITING WATERMARKS\n" + > " OVERWRITING OPTIONS\n" + > " EXCLUDING CONSTRAINTS\n" + > " \n" + > ")"); > SqlNode sqlNode = sqlParser.parseStmt(); > System.out.println(sqlNode.toString()); > } > {code} > output: > CREATE TEMPORARY TABLE `t_order_course` WITH ( 'scan.startup.mode' = > 'specific-offsets', 'scan.startup.specific-offsets' = > 'partition:0,offset:1169129' ) LIKE > `cdc`.`qq_data(sh-backend-tst:3306)`.`t_order_course` ( OVERWRITING > WATERMARKS OVERWRITING OPTIONS EXCLUDING CONSTRAINTS ) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21972) Check whether TemporalTableSourceSpec can be serialized or not
Terry Wang created FLINK-21972: -- Summary: Check whether TemporalTableSourceSpec can be serialized or not Key: FLINK-21972 URL: https://issues.apache.org/jira/browse/FLINK-21972 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang Fix For: 1.13.0 Check whether TemporalTableSourceSpec can be serialized or not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21868) Support StreamExecLookupJoin json serialization/deserialization
Terry Wang created FLINK-21868: -- Summary: Support StreamExecLookupJoin json serialization/deserialization Key: FLINK-21868 URL: https://issues.apache.org/jira/browse/FLINK-21868 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang Fix For: 1.13.0 Support StreamExecLookupJoin json serialization/deserialization -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21864) Support StreamExecTemporalJoin json serialization/deserialization
Terry Wang created FLINK-21864: -- Summary: Support StreamExecTemporalJoin json serialization/deserialization Key: FLINK-21864 URL: https://issues.apache.org/jira/browse/FLINK-21864 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang Fix For: 1.13.0 Support StreamExecTemporalJoin json serialization/deserialization -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21837) Support StreamExecIntervalJoin json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-21837: --- Description: Support StreamExecIntervalJoin json ser/des (was: Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des) > Support StreamExecIntervalJoin json ser/de > -- > > Key: FLINK-21837 > URL: https://issues.apache.org/jira/browse/FLINK-21837 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Assignee: Terry Wang >Priority: Major > Fix For: 1.13.0 > > > Support StreamExecIntervalJoin json ser/des -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21837) Support StreamExecIntervalJoin json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-21837: --- Summary: Support StreamExecIntervalJoin json ser/de (was: Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/de) > Support StreamExecIntervalJoin json ser/de > -- > > Key: FLINK-21837 > URL: https://issues.apache.org/jira/browse/FLINK-21837 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Assignee: Terry Wang >Priority: Major > Fix For: 1.13.0 > > > Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin > json ser/des -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21837) Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/de
[ https://issues.apache.org/jira/browse/FLINK-21837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-21837: --- Summary: Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/de (was: Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des) > Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin > json ser/de > -- > > Key: FLINK-21837 > URL: https://issues.apache.org/jira/browse/FLINK-21837 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Fix For: 1.13.0 > > > Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin > json ser/des -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21837) Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des
Terry Wang created FLINK-21837: -- Summary: Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des Key: FLINK-21837 URL: https://issues.apache.org/jira/browse/FLINK-21837 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang Fix For: 1.13.0 Support StreamExecIntervalJoin/StreamExecLookupJoin/StreamExecTemporalJoin json ser/des -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21811) Support StreamExecJoin json serialization/deserialization
Terry Wang created FLINK-21811: -- Summary: Support StreamExecJoin json serialization/deserialization Key: FLINK-21811 URL: https://issues.apache.org/jira/browse/FLINK-21811 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang Fix For: 1.13.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-21802) LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType
[ https://issues.apache.org/jira/browse/FLINK-21802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302255#comment-17302255 ] Terry Wang edited comment on FLINK-21802 at 3/16/21, 6:40 AM: -- It seems I mistakenly put this issure under FLINK-20435, anyone can help me move this issure to subtask of FLINK-21091 ? was (Author: terry1897): It seems I mistakenly put this issure under FLINK-20435, anyone can help me move this issure to subtask of https://issues.apache.org/jira/browse/FLINK-21091. > LogicalTypeJsonDeserializer/Serializer custom > RowType/MapType/ArrayType/MultisetType > > > Key: FLINK-21802 > URL: https://issues.apache.org/jira/browse/FLINK-21802 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > We should custom RowType/MapType/ArrayType/MultiSetType > deserialize/serializer method to allow some special LogicalType such as > TimestampType's kind field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21802) LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType
[ https://issues.apache.org/jira/browse/FLINK-21802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17302255#comment-17302255 ] Terry Wang commented on FLINK-21802: It seems I mistakenly put this issure under FLINK-20435, anyone can help me move this issure to subtask of https://issues.apache.org/jira/browse/FLINK-21091. > LogicalTypeJsonDeserializer/Serializer custom > RowType/MapType/ArrayType/MultisetType > > > Key: FLINK-21802 > URL: https://issues.apache.org/jira/browse/FLINK-21802 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.13.0 > > > We should custom RowType/MapType/ArrayType/MultiSetType > deserialize/serializer method to allow some special LogicalType such as > TimestampType's kind field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21802) LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType
Terry Wang created FLINK-21802: -- Summary: LogicalTypeJsonDeserializer/Serializer custom RowType/MapType/ArrayType/MultisetType Key: FLINK-21802 URL: https://issues.apache.org/jira/browse/FLINK-21802 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang Fix For: 1.13.0 We should custom RowType/MapType/ArrayType/MultiSetType deserialize/serializer method to allow some special LogicalType such as TimestampType's kind field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-21744) Support StreamExecDeduplicate json serialization/deserialization
[ https://issues.apache.org/jira/browse/FLINK-21744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300167#comment-17300167 ] Terry Wang commented on FLINK-21744: hi [~godfreyhe]~ I'd like to complete this exciting task. Feel free to assign this to me > Support StreamExecDeduplicate json serialization/deserialization > > > Key: FLINK-21744 > URL: https://issues.apache.org/jira/browse/FLINK-21744 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-21744) Support StreamExecDeduplicate json serialization/deserialization
[ https://issues.apache.org/jira/browse/FLINK-21744?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-21744: --- Fix Version/s: 1.13.0 > Support StreamExecDeduplicate json serialization/deserialization > > > Key: FLINK-21744 > URL: https://issues.apache.org/jira/browse/FLINK-21744 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Fix For: 1.13.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21744) Support StreamExecDeduplicate json serialization/deserialization
Terry Wang created FLINK-21744: -- Summary: Support StreamExecDeduplicate json serialization/deserialization Key: FLINK-21744 URL: https://issues.apache.org/jira/browse/FLINK-21744 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: Terry Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15848) Support both fixed allocator and dynamic allocator in flink
[ https://issues.apache.org/jira/browse/FLINK-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17115867#comment-17115867 ] Terry Wang commented on FLINK-15848: +1 for this feature. It's very useful in benchmark and some other specific use case. > Support both fixed allocator and dynamic allocator in flink > --- > > Key: FLINK-15848 > URL: https://issues.apache.org/jira/browse/FLINK-15848 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Affects Versions: 1.10.0 >Reporter: liupengcheng >Priority: Major > > Currently, we removed static allocator and only support dynamic allocation in > flink1.10, however, this allocator still has some drawbacks: > # Can not allocate resources in a range, which means the resource usage is > not under control, this has very bad effect in a shared resource cluster(e.g. > Yarn), one big query or job may occupy all the resources and cause other > jobs block. > # Not support static resource allocation. That means we can hardly do > benchmark testing across engines(e.g. Spark). Also, in resource shared > cluster(e.g. Yarn) that support over-allocation, it's hard to align the > resources usage. > As discussed in FLINK-12362 , we should support both fixed allocator and > dynamic allocator(dynamically allocating in a range) in flink. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102290#comment-17102290 ] Terry Wang commented on FLINK-17553: I open a PR https://github.com/apache/flink/pull/12028 to help understanding and solving this bug. > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND > |) WITH ( > | 'connector' = 'COLLECTION', > | 'is-bounded' = 'false' > |) > """.stripMargin > val ddl2 = > """ > |CREATE TABLE dst ( > | ts TIMESTAMP(3), > | a BIGINT, > | b DOUBLE > |) WITH ( > | 'connector.type' = 'filesystem', > | 'connector.path' = '/tmp/1', > | 'format.type' = 'csv' > |) > """.stripMargin > val query = > """ > |INSERT INTO dst > |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), > SUM(b) > |FROM src > | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) > |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) > """.stripMargin > tEnv.sqlUpdate(ddl1) > tEnv.sqlUpdate(ddl2) > tEnv.sqlUpdate(query) > println(tEnv.explain(true)) > } > {code} > I spent lots of work digging into this bug, and found the problem may be > caused by AggregateProjectPullUpConstantsRule which doesn't generate proper > project items correctly. > After I remove AggregateProjectPullUpConstantsRule from > FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. > The problem is that WindowPropertiesRule can not match RelNodeTree after the > transformation of AggregateProjectPullUpConstantsRule, we also can add > ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after > AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Fix Version/s: 1.11.0 > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Fix For: 1.11.0 > > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND > |) WITH ( > | 'connector' = 'COLLECTION', > | 'is-bounded' = 'false' > |) > """.stripMargin > val ddl2 = > """ > |CREATE TABLE dst ( > | ts TIMESTAMP(3), > | a BIGINT, > | b DOUBLE > |) WITH ( > | 'connector.type' = 'filesystem', > | 'connector.path' = '/tmp/1', > | 'format.type' = 'csv' > |) > """.stripMargin > val query = > """ > |INSERT INTO dst > |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), > SUM(b) > |FROM src > | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) > |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) > """.stripMargin > tEnv.sqlUpdate(ddl1) > tEnv.sqlUpdate(ddl2) > tEnv.sqlUpdate(query) > println(tEnv.explain(true)) > } > {code} > I spent lots of work digging into this bug, and found the problem may be > caused by AggregateProjectPullUpConstantsRule which doesn't generate proper > project items correctly. > After I remove AggregateProjectPullUpConstantsRule from > FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. > The problem is that WindowPropertiesRule can not match RelNodeTree after the > transformation of AggregateProjectPullUpConstantsRule, we also can add > ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after > AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Description: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging into this bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. The problem is WindowPropertiesRule can not match RelNodeTree after the transformation of AggregateProjectPullUpConstantsRule, we also can add ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. was: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging the bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND >
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Description: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging into this bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. The problem is that WindowPropertiesRule can not match RelNodeTree after the transformation of AggregateProjectPullUpConstantsRule, we also can add ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. was: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging into this bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. The problem is WindowPropertiesRule can not match RelNodeTree after the transformation of AggregateProjectPullUpConstantsRule, we also can add ProjectMergeRule.INSTANCE in FlinkStreamRuleSets#DEFAULT_REWRITE_RULES after AggregateProjectPullUpConstantsRule.INSTANCE to solve this problem. > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Un
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Description: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging the bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. was: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging the bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND > |) WITH ( > | 'connector' = 'COLLECTION', > | 'is-bounded' = 'false' > |) > """.stripMargin > val ddl2 = > """ > |CREATE TABLE dst ( > | ts TIMESTAMP(3), > | a BIGINT, > | b DOUBLE > |) WITH ( > | 'connector.type' = 'fi
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Description: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} I spent lots of work digging the bug, and found the problem may be caused by AggregateProjectPullUpConstantsRule which doesn't generate proper project items correctly. After I remove AggregateProjectPullUpConstantsRule from FlinkStreamRuleSets#DEFAULT_REWRITE_RULES, the test passed as expect. was: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND > |) WITH ( > | 'connector' = 'COLLECTION', > | 'is-bounded' = 'false' > |) > """.stripMargin > val ddl2 = > """ > |CREATE TABLE dst ( > | ts TIMESTAMP(3), > | a BIGINT, > | b DOUBLE > |) WITH ( > | 'connector.type' = 'filesystem', > | 'connector.path' = '/tmp/1', > | 'format.type' = 'csv' > |) > """.stripMargin > val query = > """ > |INSERT INTO dst > |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), > SUM(b) > |FROM src > | GROUP BY 'a',
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Description: Exception stack is as following: !temp.png! We can reproduce this problem by add following test in org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase {code:scala} // Some comments here @Test def testWindowAggregateOnConstantValue(): Unit = { val ddl1 = """ |CREATE TABLE src ( | log_ts STRING, | ts TIMESTAMP(3), | a INT, | b DOUBLE, | rowtime AS CAST(log_ts AS TIMESTAMP(3)), | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND |) WITH ( | 'connector' = 'COLLECTION', | 'is-bounded' = 'false' |) """.stripMargin val ddl2 = """ |CREATE TABLE dst ( | ts TIMESTAMP(3), | a BIGINT, | b DOUBLE |) WITH ( | 'connector.type' = 'filesystem', | 'connector.path' = '/tmp/1', | 'format.type' = 'csv' |) """.stripMargin val query = """ |INSERT INTO dst |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), SUM(b) |FROM src | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) """.stripMargin tEnv.sqlUpdate(ddl1) tEnv.sqlUpdate(ddl2) tEnv.sqlUpdate(query) println(tEnv.explain(true)) } {code} was: Exception stack is as following: > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: > !temp.png! > We can reproduce this problem by add following test in > org.apache.flink.table.planner.runtime.stream.sql#TimeAttributeITCase > {code:scala} > // Some comments here > @Test > def testWindowAggregateOnConstantValue(): Unit = { > val ddl1 = > """ > |CREATE TABLE src ( > | log_ts STRING, > | ts TIMESTAMP(3), > | a INT, > | b DOUBLE, > | rowtime AS CAST(log_ts AS TIMESTAMP(3)), > | WATERMARK FOR rowtime AS rowtime - INTERVAL '0.001' SECOND > |) WITH ( > | 'connector' = 'COLLECTION', > | 'is-bounded' = 'false' > |) > """.stripMargin > val ddl2 = > """ > |CREATE TABLE dst ( > | ts TIMESTAMP(3), > | a BIGINT, > | b DOUBLE > |) WITH ( > | 'connector.type' = 'filesystem', > | 'connector.path' = '/tmp/1', > | 'format.type' = 'csv' > |) > """.stripMargin > val query = > """ > |INSERT INTO dst > |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(ts), > SUM(b) > |FROM src > | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND) > |-- GROUP BY TUMBLE(rowtime, INTERVAL '0.003' SECOND) > """.stripMargin > tEnv.sqlUpdate(ddl1) > tEnv.sqlUpdate(ddl2) > tEnv.sqlUpdate(query) > println(tEnv.explain(true)) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Description: Exception stack is as following: > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Attachment: temp.png > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Attachments: temp.png > > > Exception stack is as following: -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17553) Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
[ https://issues.apache.org/jira/browse/FLINK-17553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17553: --- Summary: Constant exists in group window key leads to error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL) (was: Group by constant and window causes error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)) > Constant exists in group window key leads to error: Unsupported call: > TUMBLE_END(TIMESTAMP(3) NOT NULL) > - > > Key: FLINK-17553 > URL: https://issues.apache.org/jira/browse/FLINK-17553 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17553) Group by constant and window causes error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL)
Terry Wang created FLINK-17553: -- Summary: Group by constant and window causes error: Unsupported call: TUMBLE_END(TIMESTAMP(3) NOT NULL) Key: FLINK-17553 URL: https://issues.apache.org/jira/browse/FLINK-17553 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Terry Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-17313) Validation error when insert decimal/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17313: --- Description: Test code like follwing(in blink planner): {code:java} tEnv.sqlUpdate("create table randomSource (" + " a varchar(10)," + " b decimal(20,2)" + " ) with (" + " 'type' = 'random'," + " 'count' = '10'" + " )"); tEnv.sqlUpdate("create table printSink (" + " a varchar(10)," + " b decimal(22,2)," + " ) with (" + " 'type' = 'print'" + " )"); tEnv.sqlUpdate("insert into printSink select * from randomSource"); tEnv.execute(""); {code} Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following: {code:java} public TypeInformation getRecordType() { return getTableSchema().toRowType(); } {code} Varchar column validation exception is: org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method doesn't consider the different physical and logical type validation logic of source and sink: logical type should be able to cover the physical type in source, but physical type should be able to cover the logic type in sink vice verse. Besides, the decimal ty
[jira] [Updated] (FLINK-17313) Validation error when insert decimal/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17313: --- Summary: Validation error when insert decimal/varchar with precision into sink using TypeInformation of row (was: Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row) > Validation error when insert decimal/varchar with precision into sink using > TypeInformation of row > -- > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.Abstrac
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089530#comment-17089530 ] Terry Wang commented on FLINK-17313: supportsAvoidingCast method behavior looks right to me, relaxed check will improve user experience of old-style connector and don't affect the correctness. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.Abs
[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089371#comment-17089371 ] Terry Wang edited comment on FLINK-17313 at 4/22/20, 7:35 AM: -- We can not forbid using old-style TableSink in sql, and the old-style TableSink can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make sense to pass the check while logical type is varchar(10)/decimal(22,2) was (Author: terry1897): We can not forbid using old-style TableSink in sql, and the old-style TableSink can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make sense to pass the check logic that logical type is varchar(10)/decimal(22,2) > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collec
[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089371#comment-17089371 ] Terry Wang edited comment on FLINK-17313 at 4/22/20, 7:24 AM: -- We can not forbid using old-style TableSink in sql, and the old-style TableSink can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make sense to pass the check logic that logical type is varchar(10)/decimal(22,2) was (Author: terry1897): We can not forbid using old connector in ddl, and the old type style TableSink can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make sense to pass the check logic that logical type is varchar(10)/decimal(22,2) > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.co
[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089370#comment-17089370 ] Terry Wang edited comment on FLINK-17313 at 4/22/20, 7:22 AM: -- I think you may misunderstand my fix. Source and sink should be in two different validation logic. If the logical type defined in ddl can be consumed by physical type of table sink returned, why we must match those two schema exactly, and the check logic is a relaxed check : https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232. I don't know if I express clearly, let me know if u have more questions . [~dwysakowicz] was (Author: terry1897): I think you may misunderstand my fix. Source and sink should be in two different validation logic. If the logical type defined in ddl can be consumed by physical type of table sink returned, wy we must match those two schema exactly, and the check logic is a relaxed check : https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232. I don't know if I express clearly, let me know if u have more questions . [~dwysakowicz] > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plan
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089371#comment-17089371 ] Terry Wang commented on FLINK-17313: We can not forbid using old connector in ddl, and the old type style TableSink can consume DECIMAL(38,18) and STRING(=VARCHAR(Long.MAX)) type. So it make sense to pass the check logic that logical type is varchar(10)/decimal(22,2) > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.Traversa
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089370#comment-17089370 ] Terry Wang commented on FLINK-17313: I think you may misunderstand my fix. Source and sink should be in two different validation logic. If the logical type defined in ddl can be consumed by physical type of table sink returned, wy we must match those two schema exactly, and the check logic is a relaxed check : https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java#L232. I don't know if I express clearly, let me know if u have more questions . [~dwysakowicz] > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Itera
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089362#comment-17089362 ] Terry Wang commented on FLINK-17313: Hi [~dwysakowicz] The types that originate form the old type system of varcahr(10)/decimal(22, 2)/timestamp(3) is String/legacy(Decimal)/timestamp(3) and should be able to accept corresponding logical type in ddl, which is also the [PR|https://github.com/apache/flink/pull/11848] aims to solve. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterabl
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089357#comment-17089357 ] Terry Wang commented on FLINK-17313: [~lzljs3620320] There isn't much need to introduce a new interface in TableSink to solve this ticket. Just as [~wenlong.lwl] said, it's a validation bug causing connectors using old TableSink can not work normmaly. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.ma
[jira] [Comment Edited] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089326#comment-17089326 ] Terry Wang edited comment on FLINK-17313 at 4/22/20, 6:19 AM: -- [~jark] I agree with you that new sink interface of FLIP-95 can work normally, but there still a lot of connector that use the old interface. It's harmless to support such compatibility and useful for users who can not migrate their connector in time, right? was (Author: terry1897): [~jark] I agree with you that new sink interface of FLIP-95 can solve problem, but there still a lot of connector that use the old interface. It's harmless to support such compatibility and useful for users who can not migrate their connector in time, right? > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089326#comment-17089326 ] Terry Wang commented on FLINK-17313: [~jark] I agree with you that new sink interface of FLIP-95 can solve problem, but there still a lot of connector that use the old interface. It's harmless to support such compatibility and useful for users who can not migrate their connector in time, right? > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089258#comment-17089258 ] Terry Wang commented on FLINK-17313: I open a [https://github.com/apache/flink/pull/11848|https://github.com/apache/flink/pull/11848] to help understanding and solve this validation exception. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.Ab
[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17313: --- Description: Test code like follwing(in blink planner): {code:java} tEnv.sqlUpdate("create table randomSource (" + " a varchar(10)," + " b decimal(20,2)" + " ) with (" + " 'type' = 'random'," + " 'count' = '10'" + " )"); tEnv.sqlUpdate("create table printSink (" + " a varchar(10)," + " b decimal(22,2)," + " c timestamp(3)," + " ) with (" + " 'type' = 'print'" + " )"); tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource"); tEnv.execute(""); {code} Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following: {code:java} public TypeInformation getRecordType() { return getTableSchema().toRowType(); } {code} Varchar column validation exception is: org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method doesn't consider the different physical and logical type validation logic of source and sink: logical type should be able to cover the physical type in source
[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17313: --- Description: Test code like follwing(in blink planner): {code:java} tEnv.sqlUpdate("create table randomSource (" + " a varchar(10)," + " b decimal(20,2)" + " ) with (" + " 'type' = 'random'," + " 'count' = '10'" + " )"); tEnv.sqlUpdate("create table printSink (" + " a varchar(10)," + " b decimal(22,2)," + " c timestamp(3)," + " ) with (" + " 'type' = 'print'" + " )"); tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource"); tEnv.execute(""); {code} Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following: {code:java} public TypeInformation getRecordType() { return getTableSchema().toRowType(); } {code} Varchar column validation exception is: org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method doesn't consider the different physical and logical type validation logic of source and sink. was: Test code like follwing(in blink planner): {code:java}
[jira] [Commented] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17089251#comment-17089251 ] Terry Wang commented on FLINK-17313: cc [~ykt836][~jark][~dwysakowicz]Please have a look on this issue. > Validation error when insert decimal/timestamp/varchar with precision into > sink using TypeInformation of row > > > Key: FLINK-17313 > URL: https://issues.apache.org/jira/browse/FLINK-17313 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Terry Wang >Priority: Major > > Test code like follwing(in blink planner): > {code:java} > tEnv.sqlUpdate("create table randomSource (" + > " a varchar(10)," > + > " b > decimal(20,2)" + > " ) with (" + > " 'type' = > 'random'," + > " 'count' = '10'" > + > " )"); > tEnv.sqlUpdate("create table printSink (" + > " a varchar(10)," > + > " b > decimal(22,2)," + > " c > timestamp(3)," + > " ) with (" + > " 'type' = 'print'" + > " )"); > tEnv.sqlUpdate("insert into printSink select *, > current_timestamp from randomSource"); > tEnv.execute(""); > {code} > Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as > following: > {code:java} > public TypeInformation getRecordType() { > return getTableSchema().toRowType(); > } > {code} > Varchar column validation exception is: > org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table > field 'a' does not match with the physical type STRING of the 'a' field of > the TableSink consumed type. > at > org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) > at > org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) > at > org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) > at > org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) > at > org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) > at > org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.sc
[jira] [Updated] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
[ https://issues.apache.org/jira/browse/FLINK-17313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-17313: --- Description: Test code like follwing(in blink planner): {code:java} tEnv.sqlUpdate("create table randomSource (" + " a varchar(10)," + " b decimal(20,2)" + " ) with (" + " 'type' = 'random'," + " 'count' = '10'" + " )"); tEnv.sqlUpdate("create table printSink (" + " a varchar(10)," + " b decimal(22,2)," + " c timestamp(3)," + " ) with (" + " 'type' = 'print'" + " )"); tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource"); tEnv.execute(""); {code} Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following: {code:java} public TypeInformation getRecordType() { return getTableSchema().toRowType(); } {code} Varchar column validation exception is: org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) Other type validation exception is similar, I dig into and think it's caused by TypeMappingUtils#checkPhysicalLogicalTypeCompatible. It seems that the method don't consider the different affect of source and sink . I will open a PR soon to solve this problem. was: Test code like follwing(in blink planner): {co
[jira] [Created] (FLINK-17313) Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row
Terry Wang created FLINK-17313: -- Summary: Validation error when insert decimal/timestamp/varchar with precision into sink using TypeInformation of row Key: FLINK-17313 URL: https://issues.apache.org/jira/browse/FLINK-17313 Project: Flink Issue Type: Bug Components: Table SQL / Planner Reporter: Terry Wang Test code like follwing(in blink planner): {code:java} tEnv.sqlUpdate("create table randomSource (" + " a varchar(10)," + " b decimal(20,2)" + " ) with (" + " 'type' = 'random'," + " 'count' = '10'" + " )"); tEnv.sqlUpdate("create table printSink (" + " a varchar(10)," + " b decimal(22,2)," + " c timestamp(3)," + " ) with (" + " 'type' = 'print'" + " )"); tEnv.sqlUpdate("insert into printSink select *, current_timestamp from randomSource"); tEnv.execute(""); {code} Print TableSink implements UpsertStreamTableSink and it's getReocrdType is as following: {code:java} public TypeInformation getRecordType() { return getTableSchema().toRowType(); } {code} varchar type exception is: ||Heading 1|| |org.apache.flink.table.api.ValidationException: Type VARCHAR(10) of table field 'a' does not match with the physical type STRING of the 'a' field of the TableSink consumed type. at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:165) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:278) at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:255) at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:67) at org.apache.flink.table.types.logical.VarCharType.accept(VarCharType.java:157) at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:255) at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:161) at org.apache.flink.table.planner.sinks.TableSinkUtils$$anonfun$validateLogicalPhysicalTypesCompatible$1.apply$mcVI$sp(TableSinkUtils.scala:315) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:308) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:195) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:191) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:191) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:863) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:855) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:822) | other type validation is similar, I dig it and found it's caused by TypeMappingUtils#checkPh
[jira] [Commented] (FLINK-17263) Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with calcite's CompositeOperandTypeChecker
[ https://issues.apache.org/jira/browse/FLINK-17263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17087585#comment-17087585 ] Terry Wang commented on FLINK-17263: I open a [PR|https://github.com/apache/flink/pull/11819], and feel free assign this issue to me cc [~ykt836] > Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with > calcite's CompositeOperandTypeChecker > - > > Key: FLINK-17263 > URL: https://issues.apache.org/jira/browse/FLINK-17263 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Terry Wang >Priority: Major > Labels: pull-request-available > > Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with > calcite's CompositeOperandTypeChecker. > It seems that what CompositeOperandTypeChecker can do is a super set of > RepeatFamilyOperandTypeChecker. To keep code easy to read, it's better to do > such refactor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17263) Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with calcite's CompositeOperandTypeChecker
Terry Wang created FLINK-17263: -- Summary: Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with calcite's CompositeOperandTypeChecker Key: FLINK-17263 URL: https://issues.apache.org/jira/browse/FLINK-17263 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.11.0 Reporter: Terry Wang Remove RepeatFamilyOperandTypeChecker in blink planner and replace it with calcite's CompositeOperandTypeChecker. It seems that what CompositeOperandTypeChecker can do is a super set of RepeatFamilyOperandTypeChecker. To keep code easy to read, it's better to do such refactor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-17152) FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition
[ https://issues.apache.org/jira/browse/FLINK-17152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083878#comment-17083878 ] Terry Wang edited comment on FLINK-17152 at 4/15/20, 7:48 AM: -- cc [~liyu] I think this bug should be included in flink 1.10.1 release. was (Author: terry1897): cc [~liyu2] I think this bug should be included in flink 1.10.1 release. > FunctionDefinitionUtil generate wrong resultType and acc type of > AggregateFunctionDefinition > - > > Key: FLINK-17152 > URL: https://issues.apache.org/jira/browse/FLINK-17152 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0 >Reporter: Terry Wang >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > FunctionDefinitionUtil generate wrong resultType and acc type of > AggregateFunctionDefinition. This bug will lead to unexpect error such as: > Field types of query result and registered TableSink do not match. > Query schema: [v:RAW(IAccumulator, ?)] > Sink schema: [v: STRING] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17152) FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition
[ https://issues.apache.org/jira/browse/FLINK-17152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083878#comment-17083878 ] Terry Wang commented on FLINK-17152: cc [~liyu2] I think this bug should be included in flink 1.10.1 release. > FunctionDefinitionUtil generate wrong resultType and acc type of > AggregateFunctionDefinition > - > > Key: FLINK-17152 > URL: https://issues.apache.org/jira/browse/FLINK-17152 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.10.0 >Reporter: Terry Wang >Priority: Critical > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > FunctionDefinitionUtil generate wrong resultType and acc type of > AggregateFunctionDefinition. This bug will lead to unexpect error such as: > Field types of query result and registered TableSink do not match. > Query schema: [v:RAW(IAccumulator, ?)] > Sink schema: [v: STRING] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17152) FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition
Terry Wang created FLINK-17152: -- Summary: FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition Key: FLINK-17152 URL: https://issues.apache.org/jira/browse/FLINK-17152 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.10.0 Reporter: Terry Wang FunctionDefinitionUtil generate wrong resultType and acc type of AggregateFunctionDefinition. This bug will lead to unexpect error such as: Field types of query result and registered TableSink do not match. Query schema: [v:RAW(IAccumulator, ?)] Sink schema: [v: STRING] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread
[ https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073653#comment-17073653 ] Terry Wang commented on FLINK-16924: hi [~jark] It's ok doing so. It's okey for me to add description of not thread-safe in TableEnvironment. > TableEnvironment#sqlUpdate throw NPE when called in async thread > > > Key: FLINK-16924 > URL: https://issues.apache.org/jira/browse/FLINK-16924 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Terry Wang >Priority: Major > Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png > > > Now in the latest code of flink master branch, we encountered unexpected npe > exception as the picture like attachments. > It seems that I can reproduce this problem by creating tableEnv and doing > some operations in main thread and then calling sqlQuery or sqlUpdate in > another async thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread
[ https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073443#comment-17073443 ] Terry Wang commented on FLINK-16924: hi [~twalthr] My use case is to explain the execute plan in async thread to avoid potential compile too much time or other unknown struck in main thread. > TableEnvironment#sqlUpdate throw NPE when called in async thread > > > Key: FLINK-16924 > URL: https://issues.apache.org/jira/browse/FLINK-16924 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Terry Wang >Priority: Major > Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png > > > Now in the latest code of flink master branch, we encountered unexpected npe > exception as the picture like attachments. > It seems that I can reproduce this problem by creating tableEnv and doing > some operations in main thread and then calling sqlQuery or sqlUpdate in > another async thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread
[ https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072801#comment-17072801 ] Terry Wang commented on FLINK-16924: cc [~danny0405] > TableEnvironment#sqlUpdate throw NPE when called in async thread > > > Key: FLINK-16924 > URL: https://issues.apache.org/jira/browse/FLINK-16924 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Terry Wang >Priority: Major > Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png > > > Now in the latest code of flink master branch, we encountered unexpected npe > exception as the picture like attachments. > It seems that I can reproduce this problem by creating tableEnv and doing > some operations in main thread and then calling sqlQuery or sqlUpdate in > another async thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread
[ https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16924: --- Description: Now in the latest code of flink master branch, we encountered unexpected npe exception as the picture like attachments. It seems that I can reproduce this problem by creating tableEnv and doing some operations in main thread and then calling sqlQuery or sqlUpdate in another async thread. > TableEnvironment#sqlUpdate throw NPE when called in async thread > > > Key: FLINK-16924 > URL: https://issues.apache.org/jira/browse/FLINK-16924 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Terry Wang >Priority: Major > Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png > > > Now in the latest code of flink master branch, we encountered unexpected npe > exception as the picture like attachments. > It seems that I can reproduce this problem by creating tableEnv and doing > some operations in main thread and then calling sqlQuery or sqlUpdate in > another async thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread
[ https://issues.apache.org/jira/browse/FLINK-16924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16924: --- Attachment: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png > TableEnvironment#sqlUpdate throw NPE when called in async thread > > > Key: FLINK-16924 > URL: https://issues.apache.org/jira/browse/FLINK-16924 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.11.0 >Reporter: Terry Wang >Priority: Major > Attachments: 7871C046-4D67-49C8-AC8A-A4030A7981CC.png > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16924) TableEnvironment#sqlUpdate throw NPE when called in async thread
Terry Wang created FLINK-16924: -- Summary: TableEnvironment#sqlUpdate throw NPE when called in async thread Key: FLINK-16924 URL: https://issues.apache.org/jira/browse/FLINK-16924 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.11.0 Reporter: Terry Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition
[ https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16506: --- Description: We can reproduce this problem in FlinkSqlParserImplTest, add one more column definition ` x varchar comment 'Flink 社区', \n` ``` @Test public void testCreateTableWithComment() { conformance0 = FlinkSqlConformance.HIVE; check("CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + " x varchar comment 'Flink 社区', \n" + " g as 2 * (a + 1), \n" + " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), \n" + " b varchar,\n" + " proc as PROCTIME(), \n" + " PRIMARY KEY (a, b)\n" + ")\n" + "comment 'test table comment ABC.'\n" + "PARTITIONED BY (a, h)\n" + " with (\n" + "'connector' = 'kafka', \n" + "'kafka.topic' = 'log.test'\n" + ")\n", "CREATE TABLE `TBL1` (\n" + " `A` BIGINT COMMENT 'test column comment AAA.',\n" + " `H` VARCHAR,\n" + " `X` VARCHAR COMMENT 'Flink 社区', \n" + " `G` AS (2 * (`A` + 1)),\n" + " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd HH:mm:ss'),\n" + " `B` VARCHAR,\n" + " `PROC` AS `PROCTIME`(),\n" + " PRIMARY KEY (`A`, `B`)\n" + ")\n" + "COMMENT 'test table comment ABC.'\n" + "PARTITIONED BY (`A`, `H`)\n" + "WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + ")"); } ``` the actual unparse of x column will be ` X` VARCHAR COMMENT u&'Flink \793e\533a' instead of our expection. was: We can reproduce this problem in FlinkSqlParserImplTest, add one more column definition ` x varchar comment 'Flink 社区', \n` ``` @Test public void testCreateTableWithComment() { conformance0 = FlinkSqlConformance.HIVE; check("CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + " x varchar comment 'Flink 社区', \n" + " g as 2 * (a + 1), \n" + " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), \n" + " b varchar,\n" + " proc as PROCTIME(), \n" + " PRIMARY KEY (a, b)\n" + ")\n" + "comment 'test table comment ABC.'\n" + "PARTITIONED BY (a, h)\n" + " with (\n" + "'connector' = 'kafka', \n" + "'kafka.topic' = 'log.test'\n" + ")\n", "CREATE TABLE `TBL1` (\n" + " `A` BIGINT COMMENT 'test column comment AAA.',\n" + " `H` VARCHAR,\n" + " `X` VARCHAR COMMENT 'Flink 社区', \n" + " `G` AS (2 * (`A` + 1)),\n" + " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd HH:mm:ss'),\n" + " `B` VARCHAR,\n" + " `PROC` AS `PROCTIME`(),\n" + " PRIMARY KEY (`A`, `B`)\n" + ")\n" + "COMMENT 'test table comment ABC.'\n" + "PARTITIONED BY (`A`, `H`)\n" + "WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + ")"); } ``` the actual unparse of x column will be ` X` VARCHAR COMMENT u&'Flink \793e\533a' instead of out expection. > SqlCreateTable can not get the original text when there exists non-ascii char > in the col
[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition
[ https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16506: --- Affects Version/s: 1.10.0 > SqlCreateTable can not get the original text when there exists non-ascii char > in the column definition > -- > > Key: FLINK-16506 > URL: https://issues.apache.org/jira/browse/FLINK-16506 > Project: Flink > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Terry Wang >Priority: Major > > We can reproduce this problem in FlinkSqlParserImplTest, add one more column > definition > ` x varchar comment 'Flink 社区', \n` > ``` > @Test > public void testCreateTableWithComment() { > conformance0 = FlinkSqlConformance.HIVE; > check("CREATE TABLE tbl1 (\n" + > " a bigint comment 'test column comment > AAA.',\n" + > " h varchar, \n" + > " x varchar comment 'Flink 社区', \n" + > " g as 2 * (a + 1), \n" + > " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), > \n" + > " b varchar,\n" + > " proc as PROCTIME(), \n" + > " PRIMARY KEY (a, b)\n" + > ")\n" + > "comment 'test table comment ABC.'\n" + > "PARTITIONED BY (a, h)\n" + > " with (\n" + > "'connector' = 'kafka', \n" + > "'kafka.topic' = 'log.test'\n" + > ")\n", > "CREATE TABLE `TBL1` (\n" + > " `A` BIGINT COMMENT 'test column comment > AAA.',\n" + > " `H` VARCHAR,\n" + > " `X` VARCHAR COMMENT 'Flink 社区', \n" + > " `G` AS (2 * (`A` + 1)),\n" + > " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd > HH:mm:ss'),\n" + > " `B` VARCHAR,\n" + > " `PROC` AS `PROCTIME`(),\n" + > " PRIMARY KEY (`A`, `B`)\n" + > ")\n" + > "COMMENT 'test table comment ABC.'\n" + > "PARTITIONED BY (`A`, `H`)\n" + > "WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'kafka.topic' = 'log.test'\n" + > ")"); > } > ``` > the actual unparse of x column will be ` X` VARCHAR COMMENT u&'Flink > \793e\533a' instead of our expection. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition
[ https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17055527#comment-17055527 ] Terry Wang commented on FLINK-16506: cc [~danny0405] to have a look > SqlCreateTable can not get the original text when there exists non-ascii char > in the column definition > -- > > Key: FLINK-16506 > URL: https://issues.apache.org/jira/browse/FLINK-16506 > Project: Flink > Issue Type: Bug >Reporter: Terry Wang >Priority: Major > > We can reproduce this problem in FlinkSqlParserImplTest, add one more column > definition > ` x varchar comment 'Flink 社区', \n` > ``` > @Test > public void testCreateTableWithComment() { > conformance0 = FlinkSqlConformance.HIVE; > check("CREATE TABLE tbl1 (\n" + > " a bigint comment 'test column comment > AAA.',\n" + > " h varchar, \n" + > " x varchar comment 'Flink 社区', \n" + > " g as 2 * (a + 1), \n" + > " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), > \n" + > " b varchar,\n" + > " proc as PROCTIME(), \n" + > " PRIMARY KEY (a, b)\n" + > ")\n" + > "comment 'test table comment ABC.'\n" + > "PARTITIONED BY (a, h)\n" + > " with (\n" + > "'connector' = 'kafka', \n" + > "'kafka.topic' = 'log.test'\n" + > ")\n", > "CREATE TABLE `TBL1` (\n" + > " `A` BIGINT COMMENT 'test column comment > AAA.',\n" + > " `H` VARCHAR,\n" + > " `X` VARCHAR COMMENT 'Flink 社区', \n" + > " `G` AS (2 * (`A` + 1)),\n" + > " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd > HH:mm:ss'),\n" + > " `B` VARCHAR,\n" + > " `PROC` AS `PROCTIME`(),\n" + > " PRIMARY KEY (`A`, `B`)\n" + > ")\n" + > "COMMENT 'test table comment ABC.'\n" + > "PARTITIONED BY (`A`, `H`)\n" + > "WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'kafka.topic' = 'log.test'\n" + > ")"); > } > ``` > the actual unparse of x column will be ` X` VARCHAR COMMENT u&'Flink > \793e\533a' instead of out expection. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii code in the column definition
[ https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16506: --- Description: We can reproduce this problem in FlinkSqlParserImplTest, add one more column definition ` x varchar comment 'Flink 社区', \n` ``` @Test public void testCreateTableWithComment() { conformance0 = FlinkSqlConformance.HIVE; check("CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + " x varchar comment 'Flink 社区', \n" + " g as 2 * (a + 1), \n" + " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), \n" + " b varchar,\n" + " proc as PROCTIME(), \n" + " PRIMARY KEY (a, b)\n" + ")\n" + "comment 'test table comment ABC.'\n" + "PARTITIONED BY (a, h)\n" + " with (\n" + "'connector' = 'kafka', \n" + "'kafka.topic' = 'log.test'\n" + ")\n", "CREATE TABLE `TBL1` (\n" + " `A` BIGINT COMMENT 'test column comment AAA.',\n" + " `H` VARCHAR,\n" + " `X` VARCHAR COMMENT 'Flink 社区', \n" + " `G` AS (2 * (`A` + 1)),\n" + " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd HH:mm:ss'),\n" + " `B` VARCHAR,\n" + " `PROC` AS `PROCTIME`(),\n" + " PRIMARY KEY (`A`, `B`)\n" + ")\n" + "COMMENT 'test table comment ABC.'\n" + "PARTITIONED BY (`A`, `H`)\n" + "WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + ")"); } ``` the actual unparse of x column will be ` X` VARCHAR COMMENT u&'Flink \793e\533a' instead of out expection. > SqlCreateTable can not get the original text when there exists non-ascii code > in the column definition > -- > > Key: FLINK-16506 > URL: https://issues.apache.org/jira/browse/FLINK-16506 > Project: Flink > Issue Type: Bug >Reporter: Terry Wang >Priority: Major > > We can reproduce this problem in FlinkSqlParserImplTest, add one more column > definition > ` x varchar comment 'Flink 社区', \n` > ``` > @Test > public void testCreateTableWithComment() { > conformance0 = FlinkSqlConformance.HIVE; > check("CREATE TABLE tbl1 (\n" + > " a bigint comment 'test column comment > AAA.',\n" + > " h varchar, \n" + > " x varchar comment 'Flink 社区', \n" + > " g as 2 * (a + 1), \n" + > " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), > \n" + > " b varchar,\n" + > " proc as PROCTIME(), \n" + > " PRIMARY KEY (a, b)\n" + > ")\n" + > "comment 'test table comment ABC.'\n" + > "PARTITIONED BY (a, h)\n" + > " with (\n" + > "'connector' = 'kafka', \n" + > "'kafka.topic' = 'log.test'\n" + > ")\n", > "CREATE TABLE `TBL1` (\n" + > " `A` BIGINT COMMENT 'test column comment > AAA.',\n" + > " `H` VARCHAR,\n" + > " `X` VARCHAR COMMENT 'Flink 社区', \n" + > " `G` AS (2 * (`A` + 1)),\n" + > " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd > HH:mm:ss'),\n" + > " `B` VARCHAR,\n" + > " `PROC` AS `PROCTIME`(),\n" + > " PRIMARY KEY (`A`, `B`)\n" + > ")\n" + > "COMMENT 'test table comment ABC.'\n" + > "PARTITIONED BY
[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii char in the column definition
[ https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16506: --- Summary: SqlCreateTable can not get the original text when there exists non-ascii char in the column definition (was: SqlCreateTable can not get the original text when there exists non-ascii code in the column definition) > SqlCreateTable can not get the original text when there exists non-ascii char > in the column definition > -- > > Key: FLINK-16506 > URL: https://issues.apache.org/jira/browse/FLINK-16506 > Project: Flink > Issue Type: Bug >Reporter: Terry Wang >Priority: Major > > We can reproduce this problem in FlinkSqlParserImplTest, add one more column > definition > ` x varchar comment 'Flink 社区', \n` > ``` > @Test > public void testCreateTableWithComment() { > conformance0 = FlinkSqlConformance.HIVE; > check("CREATE TABLE tbl1 (\n" + > " a bigint comment 'test column comment > AAA.',\n" + > " h varchar, \n" + > " x varchar comment 'Flink 社区', \n" + > " g as 2 * (a + 1), \n" + > " ts as toTimestamp(b, '-MM-dd HH:mm:ss'), > \n" + > " b varchar,\n" + > " proc as PROCTIME(), \n" + > " PRIMARY KEY (a, b)\n" + > ")\n" + > "comment 'test table comment ABC.'\n" + > "PARTITIONED BY (a, h)\n" + > " with (\n" + > "'connector' = 'kafka', \n" + > "'kafka.topic' = 'log.test'\n" + > ")\n", > "CREATE TABLE `TBL1` (\n" + > " `A` BIGINT COMMENT 'test column comment > AAA.',\n" + > " `H` VARCHAR,\n" + > " `X` VARCHAR COMMENT 'Flink 社区', \n" + > " `G` AS (2 * (`A` + 1)),\n" + > " `TS` AS `TOTIMESTAMP`(`B`, '-MM-dd > HH:mm:ss'),\n" + > " `B` VARCHAR,\n" + > " `PROC` AS `PROCTIME`(),\n" + > " PRIMARY KEY (`A`, `B`)\n" + > ")\n" + > "COMMENT 'test table comment ABC.'\n" + > "PARTITIONED BY (`A`, `H`)\n" + > "WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'kafka.topic' = 'log.test'\n" + > ")"); > } > ``` > the actual unparse of x column will be ` X` VARCHAR COMMENT u&'Flink > \793e\533a' instead of out expection. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16506) SqlCreateTable can not get the original text when there exists non-ascii code in the column definition
[ https://issues.apache.org/jira/browse/FLINK-16506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16506: --- Summary: SqlCreateTable can not get the original text when there exists non-ascii code in the column definition (was: Sql) > SqlCreateTable can not get the original text when there exists non-ascii code > in the column definition > -- > > Key: FLINK-16506 > URL: https://issues.apache.org/jira/browse/FLINK-16506 > Project: Flink > Issue Type: Bug >Reporter: Terry Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16506) Sql
Terry Wang created FLINK-16506: -- Summary: Sql Key: FLINK-16506 URL: https://issues.apache.org/jira/browse/FLINK-16506 Project: Flink Issue Type: Bug Reporter: Terry Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16414) create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null
[ https://issues.apache.org/jira/browse/FLINK-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17050923#comment-17050923 ] Terry Wang commented on FLINK-16414: cc [~bli] to confirm~ > create udaf/udtf function using sql casuing ValidationException: SQL > validation failed. null > > > Key: FLINK-16414 > URL: https://issues.apache.org/jira/browse/FLINK-16414 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.10.0 >Reporter: Terry Wang >Priority: Critical > > When using TableEnvironment#sqlupdate to create a udaf or udtf function, > which doesn't override the getResultType() method, it's normal. But when > using this function in later insert sql, some exception like following will > be throwed: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. null > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > The reason is in FunctionDefinitionUtil#createFunctionDefinition, we > shouldn't direct call t.getResultType or a.getAccumulatorType() or > a.getResultType() but using > UserDefinedFunctionHelper#getReturnTypeOfTableFunction > UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction > UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead. > ``` > if (udf instanceof ScalarFunction) { > return new ScalarFunctionDefinition( > name, > (ScalarFunction) udf > ); > } else if (udf instanceof TableFunction) { > TableFunction t = (TableFunction) udf; > return new TableFunctionDefinition( > name, > t, > t.getResultType() > ); > } else if (udf instanceof AggregateFunction) { > AggregateFunction a = (AggregateFunction) udf; > return new AggregateFunctionDefinition( > name, > a, > a.getAccumulatorType(), > a.getResultType() > ); > } else if (udf instanceof TableAggregateFunction) { > TableAggregateFunction a = (TableAggregateFunction) udf; > return new TableAggregateFunctionDefinition( > name, > a, > a.getAccumulatorType(), > a.getResultType() > ); > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16414) create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null
[ https://issues.apache.org/jira/browse/FLINK-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-16414: --- Description: When using TableEnvironment#sqlupdate to create a udaf or udtf function, which doesn't override the getResultType() method, it's normal. But when using this function in later insert sql, some exception like following will be throwed: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) The reason is in FunctionDefinitionUtil#createFunctionDefinition, we shouldn't direct call t.getResultType or a.getAccumulatorType() or a.getResultType() but using UserDefinedFunctionHelper#getReturnTypeOfTableFunction UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead. ``` if (udf instanceof ScalarFunction) { return new ScalarFunctionDefinition( name, (ScalarFunction) udf ); } else if (udf instanceof TableFunction) { TableFunction t = (TableFunction) udf; return new TableFunctionDefinition( name, t, t.getResultType() ); } else if (udf instanceof AggregateFunction) { AggregateFunction a = (AggregateFunction) udf; return new AggregateFunctionDefinition( name, a, a.getAccumulatorType(), a.getResultType() ); } else if (udf instanceof TableAggregateFunction) { TableAggregateFunction a = (TableAggregateFunction) udf; return new TableAggregateFunctionDefinition( name, a, a.getAccumulatorType(), a.getResultType() ); ``` was: When using TableEnvironment.sqlupdate() to create a udaf or udtf function, if the function doesn't override the getResultType() method, it's normal. But when using this function in the insert sql, some exception like following will be throwed: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) The reason is in FunctionDefinitionUtil#createFunctionDefinition, we shouldn't direct call t.getResultType or a.getAccumulatorType() or a.getResultType() but using UserDefinedFunctionHelper#getReturnTypeOfTableFunction UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead. ``` if (udf instanceof ScalarFunction) { return new ScalarFunctionDefinition( name, (ScalarFunction) udf ); } else if (udf instanceof TableFunction
[jira] [Created] (FLINK-16414) create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null
Terry Wang created FLINK-16414: -- Summary: create udaf/udtf function using sql casuing ValidationException: SQL validation failed. null Key: FLINK-16414 URL: https://issues.apache.org/jira/browse/FLINK-16414 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.10.0 Reporter: Terry Wang When using TableEnvironment.sqlupdate() to create a udaf or udtf function, if the function doesn't override the getResultType() method, it's normal. But when using this function in the insert sql, some exception like following will be throwed: Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. null at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) The reason is in FunctionDefinitionUtil#createFunctionDefinition, we shouldn't direct call t.getResultType or a.getAccumulatorType() or a.getResultType() but using UserDefinedFunctionHelper#getReturnTypeOfTableFunction UserDefinedFunctionHelper#getAccumulatorTypeOfAggregateFunction UserDefinedFunctionHelper#getReturnTypeOfAggregateFunction instead. ``` if (udf instanceof ScalarFunction) { return new ScalarFunctionDefinition( name, (ScalarFunction) udf ); } else if (udf instanceof TableFunction) { TableFunction t = (TableFunction) udf; return new TableFunctionDefinition( name, t, t.getResultType() ); } else if (udf instanceof AggregateFunction) { AggregateFunction a = (AggregateFunction) udf; return new AggregateFunctionDefinition( name, a, a.getAccumulatorType(), a.getResultType() ); } else if (udf instanceof TableAggregateFunction) { TableAggregateFunction a = (TableAggregateFunction) udf; return new TableAggregateFunctionDefinition( name, a, a.getAccumulatorType(), a.getResultType() ); ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem
[ https://issues.apache.org/jira/browse/FLINK-15544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17028767#comment-17028767 ] Terry Wang commented on FLINK-15544: hi [~chesnay] It works well after I bundle a newer http-core version in my jar. It's not critical to change the version of http client for there no other people have ever reported it. It's ok for me to close this issue. > Upgrade http-core version to avoid potential DeadLock problem > - > > Key: FLINK-15544 > URL: https://issues.apache.org/jira/browse/FLINK-15544 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Terry Wang >Priority: Major > > Due to the bug of http-core:4..46 (we current use) HTTPCORE-446, it may cause > the bug of deadLock, we should upgrade the version. > Background: > We have a custom connector whose parent project is flink-parent, which cause > the dependency of all connector sdk's http-client version and http core > version become consistent with the version in DependencyManagement section. > It costs us a lot to debug the problem due to HTTPCORE-446 in our production > environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
[ https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014903#comment-17014903 ] Terry Wang commented on FLINK-15552: Hi, [~jark]. I'm sure about it. As for SQL CLI e2e tests, maybe there is kafka jar under /lib directory. > SQL Client can not correctly create kafka table using --library to indicate a > kafka connector directory > --- > > Key: FLINK-15552 > URL: https://issues.apache.org/jira/browse/FLINK-15552 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Table SQL / Runtime >Reporter: Terry Wang >Priority: Major > > How to Reproduce: > first, I start a sql client and using `-l` to point to a kafka connector > directory. > ` > bin/sql-client.sh embedded -l /xx/connectors/kafka/ > ` > Then, I create a Kafka Table like following > ` > Flink SQL> CREATE TABLE MyUserTable ( > > content String > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'test', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = 'localhost:9092', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.startup-mode' = 'earliest-offset', > > 'format.type' = 'csv' > > ); > [INFO] Table has been created. > ` > Then I select from just created table and an exception been thrown: > ` > Flink SQL> select * from MyUserTable; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a > suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > Reason: Required context properties mismatch. > The matching candidates: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > The following properties are requested: > connector.properties.bootstrap.servers=localhost:9092 > connector.properties.group.id=testGroup > connector.properties.zookeeper.connect=localhost:2181 > connector.startup-mode=earliest-offset > connector.topic=test > connector.type=kafka > connector.version=universal > format.type=csv > schema.0.data-type=VARCHAR(2147483647) > schema.0.name=content > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > ` > Potential Reasons: > Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a > CatalogTable to TableSource, but when call `TableFactoryService.find` we > don't pass current classLoader to this method, the default loader will be > BootStrapClassLoader, which can not find our factory. > I verified in my box, it's truly caused by this behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
[ https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15552: --- Description: How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to this method, the default loader will be BootStrapClassLoader, which can not find our factory. I verified in my box, it's truely caused by this behavior. was: How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to this method, the default loader will be BootStrapClassLoader, which can not find our factory. > SQL Client can not correctly create kafka table using --library to indicate a > kafka connector directory > --- > > Key: FLINK-15552 > URL: https://issues.apache.org/jira/browse/FLINK-15552 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Table SQL / Runtime >Reporter: Terry Wang >Priority: Major > > How to Reproduce: > first, I start a sql client and using `-l` to point to a kafka connect
[jira] [Updated] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
[ https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15552: --- Description: How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to this method, the default loader will be BootStrapClassLoader, which can not find our factory. I verified in my box, it's truly caused by this behavior. was: How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to this method, the default loader will be BootStrapClassLoader, which can not find our factory. I verified in my box, it's truely caused by this behavior. > SQL Client can not correctly create kafka table using --library to indicate a > kafka connector directory > --- > > Key: FLINK-15552 > URL: https://issues.apache.org/jira/browse/FLINK-15552 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Table SQL / Runtime >Reporter: Terry Wang >Priority: Major > > How to Reproduce: > first, I st
[jira] [Updated] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
[ https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15552: --- Description: How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to this method, the default loader will be BootStrapClassLoader, which can not find our factory. was: How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to the this method, the defualt loader will be BootStrapClassLoader, which can not find our factory. > SQL Client can not correctly create kafka table using --library to indicate a > kafka connector directory > --- > > Key: FLINK-15552 > URL: https://issues.apache.org/jira/browse/FLINK-15552 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Table SQL / Runtime >Reporter: Terry Wang >Priority: Major > > How to Reproduce: > first, I start a sql client and using `-l` to point to a kafka connector > directory. > ` > bin/sql-client.sh embedded -l /x
[jira] [Created] (FLINK-15552) SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory
Terry Wang created FLINK-15552: -- Summary: SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory Key: FLINK-15552 URL: https://issues.apache.org/jira/browse/FLINK-15552 Project: Flink Issue Type: Bug Components: Table SQL / Client, Table SQL / Runtime Reporter: Terry Wang How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to the this method, the defualt loader will be BootStrapClassLoader, which can not find our factory. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem
[ https://issues.apache.org/jira/browse/FLINK-15544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15544: --- Description: Due to the bug of http-core:4..46 (we current use) HTTPCORE-446, it may cause the bug of deadLock, we should upgrade the version. Background: We have a custom connector whose parent project is flink-parent, which cause the dependency of all connector sdk's http-client version and http core version become consistent with the version in DependencyManagement section. It costs us a lot to debug the problem due to HTTPCORE-446 in our production environment. was: Due to the bug of http-core:4..46 (we current use) https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of deadLock, we should upgrade the version. Background: We have a custom connector whose parent project is flink-parent, which cause the dependency of all connector sdk's http-client version and http core version become consistent with the version in DependencyManagement section. It costs us a lot to debug the problem due to HTTPCORE-446 in out production environment. > Upgrade http-core version to avoid potential DeadLock problem > - > > Key: FLINK-15544 > URL: https://issues.apache.org/jira/browse/FLINK-15544 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Terry Wang >Priority: Major > > Due to the bug of http-core:4..46 (we current use) HTTPCORE-446, it may cause > the bug of deadLock, we should upgrade the version. > Background: > We have a custom connector whose parent project is flink-parent, which cause > the dependency of all connector sdk's http-client version and http core > version become consistent with the version in DependencyManagement section. > It costs us a lot to debug the problem due to HTTPCORE-446 in our production > environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem
[ https://issues.apache.org/jira/browse/FLINK-15544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15544: --- Description: Due to the bug of http-core:4..46 (we current use) https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of deadLock, we should upgrade the version. Background: We have a custom connector whose parent project is flink-parent, which cause the dependency of all connector sdk's http-client version and http core version become consistent with the version in DependencyManagement section. It costs us a lot to debug the problem due to HTTPCORE-446 in out production environment. was:Due to the bug of http-core:4..46 (we current use) https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of deadLock, we should upgrade the version. > Upgrade http-core version to avoid potential DeadLock problem > - > > Key: FLINK-15544 > URL: https://issues.apache.org/jira/browse/FLINK-15544 > Project: Flink > Issue Type: Bug > Components: Build System >Reporter: Terry Wang >Priority: Major > > Due to the bug of http-core:4..46 (we current use) > https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of > deadLock, we should upgrade the version. > Background: > We have a custom connector whose parent project is flink-parent, which cause > the dependency of all connector sdk's http-client version and http core > version become consistent with the version in DependencyManagement section. > It costs us a lot to debug the problem due to HTTPCORE-446 in out production > environment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15544) Upgrade http-core version to avoid potential DeadLock problem
Terry Wang created FLINK-15544: -- Summary: Upgrade http-core version to avoid potential DeadLock problem Key: FLINK-15544 URL: https://issues.apache.org/jira/browse/FLINK-15544 Project: Flink Issue Type: Bug Components: Build System Reporter: Terry Wang Due to the bug of http-core:4..46 (we current use) https://issues.apache.org/jira/browse/HTTPCORE-446, it may cause the bug of deadLock, we should upgrade the version. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15429) read hive table null value of timestamp type will throw an npe
[ https://issues.apache.org/jira/browse/FLINK-15429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17004153#comment-17004153 ] Terry Wang commented on FLINK-15429: cc [~lirui] > read hive table null value of timestamp type will throw an npe > -- > > Key: FLINK-15429 > URL: https://issues.apache.org/jira/browse/FLINK-15429 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.10.0 >Reporter: Terry Wang >Priority: Major > Fix For: 1.10.0 > > > When there is null value of timestamp type in hive table, will have exception > like following: > Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:122) > at > org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) > at > org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at SinkConversion$1.processElement(Unknown Source) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > Caused by: java.lang.NullPointerException > at > org.apache.flink.table.catalog.hive.client.HiveShimV100.ensureSupportedFlinkTimestamp(HiveShimV100.java:386) > at > org.apache.flink.table.catalog.hive.client.HiveShimV100.toHiveTimestamp(HiveShimV100.java:357) > at > org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$b054b59b$1(HiveInspectors.java:216) > at > org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$7f882244$1(HiveInspectors.java:172) > at > org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.getConvertedRow(HiveOutputFormatFactory.java:190) > at > org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:206) > at > org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:178) > at > org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:52) > at > org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:120) > ... 19 more > We should add null check in HiveShim100#ensureSupportedFlinkTimestamp and > return a prper value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15429) read hive table null value of timestamp type will throw an npe
Terry Wang created FLINK-15429: -- Summary: read hive table null value of timestamp type will throw an npe Key: FLINK-15429 URL: https://issues.apache.org/jira/browse/FLINK-15429 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.10.0 Reporter: Terry Wang Fix For: 1.10.0 When there is null value of timestamp type in hive table, will have exception like following: Caused by: org.apache.flink.table.api.TableException: Exception in writeRecord at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:122) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$1.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) Caused by: java.lang.NullPointerException at org.apache.flink.table.catalog.hive.client.HiveShimV100.ensureSupportedFlinkTimestamp(HiveShimV100.java:386) at org.apache.flink.table.catalog.hive.client.HiveShimV100.toHiveTimestamp(HiveShimV100.java:357) at org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$b054b59b$1(HiveInspectors.java:216) at org.apache.flink.table.functions.hive.conversion.HiveInspectors.lambda$getConversion$7f882244$1(HiveInspectors.java:172) at org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.getConvertedRow(HiveOutputFormatFactory.java:190) at org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:206) at org.apache.flink.connectors.hive.HiveOutputFormatFactory$HiveOutputFormat.writeRecord(HiveOutputFormatFactory.java:178) at org.apache.flink.table.filesystem.SingleDirectoryWriter.write(SingleDirectoryWriter.java:52) at org.apache.flink.table.filesystem.FileSystemOutputFormat.writeRecord(FileSystemOutputFormat.java:120) ... 19 more We should add null check in HiveShim100#ensureSupportedFlinkTimestamp and return a prper value. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15398) Correct catalog doc example mistake
Terry Wang created FLINK-15398: -- Summary: Correct catalog doc example mistake Key: FLINK-15398 URL: https://issues.apache.org/jira/browse/FLINK-15398 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.10.0 Reporter: Terry Wang https://ci.apache.org/projects/flink/flink-docs-master/dev/table/catalogs.html#how-to-create-and-register-flink-tables-to-catalog Now we don't support `show tables` through TableEnvironemt.sqlQuery() method, we should correct it . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-15256) unable to drop table in HiveCatalogITCase
[ https://issues.apache.org/jira/browse/FLINK-15256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16996974#comment-16996974 ] Terry Wang edited comment on FLINK-15256 at 12/16/19 5:17 AM: -- I add `tableEnv.sqlUpdate("drop table myhive.`default`.tests2");` in HiveCatalogITCase#testCsvTableViaSQL and run it both through intelliJ IDEA and `mvn clean install`, all works well. Can you update your code to the latest and test again? cc [~phoenixjiangnan] was (Author: terry1897): I add `tableEnv.sqlUpdate("drop table myhive.`default`.tests2");` in HiveCatalogITCase#testCsvTableViaSQL and run it both through intelliJ IDEA and `mvn clean install`, all works well. Can you update your code to the latest and test again? > unable to drop table in HiveCatalogITCase > - > > Key: FLINK-15256 > URL: https://issues.apache.org/jira/browse/FLINK-15256 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Terry Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0, 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {code:java} > @Test > public void testCsvTableViaSQL() throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > tableEnv.registerCatalog("myhive", hiveCatalog); > tableEnv.useCatalog("myhive"); > String path = > this.getClass().getResource("/csv/test.csv").getPath(); > tableEnv.sqlUpdate("create table test2 (name String, age Int) > with (\n" + > " 'connector.type' = 'filesystem',\n" + > " 'connector.path' = 'file://" + path + "',\n" + > " 'format.type' = 'csv'\n" + > ")"); > Table t = tableEnv.sqlQuery("SELECT * FROM > myhive.`default`.test2"); > List result = TableUtils.collectToList(t); > // assert query result > assertEquals( > new HashSet<>(Arrays.asList( > Row.of("1", 1), > Row.of("2", 2), > Row.of("3", 3))), > new HashSet<>(result) > ); > tableEnv.sqlUpdate("drop table myhive.`default`.tests2"); > } > {code} > The last drop table statement reports error as: > {code:java} > org.apache.flink.table.api.ValidationException: Could not execute DropTable > in path `myhive`.`default`.`tests2` > at > org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:568) > at > org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:543) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:519) > at > org.apache.flink.table.catalog.hive.HiveCatalogITCase.testCsvTableViaSQL(HiveCatalogITCase.java:123) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at > org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runTestMethod(FlinkStandaloneHiveRunner.java:169) > at > org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:154) > at > org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:92) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefore
[jira] [Commented] (FLINK-15256) unable to drop table in HiveCatalogITCase
[ https://issues.apache.org/jira/browse/FLINK-15256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16996974#comment-16996974 ] Terry Wang commented on FLINK-15256: I add `tableEnv.sqlUpdate("drop table myhive.`default`.tests2");` in HiveCatalogITCase#testCsvTableViaSQL and run it both through intelliJ IDEA and `mvn clean install`, all works well. Can you update your code to the latest and test again? > unable to drop table in HiveCatalogITCase > - > > Key: FLINK-15256 > URL: https://issues.apache.org/jira/browse/FLINK-15256 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Reporter: Bowen Li >Assignee: Terry Wang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.10.0, 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > {code:java} > @Test > public void testCsvTableViaSQL() throws Exception { > EnvironmentSettings settings = > EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); > TableEnvironment tableEnv = TableEnvironment.create(settings); > tableEnv.registerCatalog("myhive", hiveCatalog); > tableEnv.useCatalog("myhive"); > String path = > this.getClass().getResource("/csv/test.csv").getPath(); > tableEnv.sqlUpdate("create table test2 (name String, age Int) > with (\n" + > " 'connector.type' = 'filesystem',\n" + > " 'connector.path' = 'file://" + path + "',\n" + > " 'format.type' = 'csv'\n" + > ")"); > Table t = tableEnv.sqlQuery("SELECT * FROM > myhive.`default`.test2"); > List result = TableUtils.collectToList(t); > // assert query result > assertEquals( > new HashSet<>(Arrays.asList( > Row.of("1", 1), > Row.of("2", 2), > Row.of("3", 3))), > new HashSet<>(result) > ); > tableEnv.sqlUpdate("drop table myhive.`default`.tests2"); > } > {code} > The last drop table statement reports error as: > {code:java} > org.apache.flink.table.api.ValidationException: Could not execute DropTable > in path `myhive`.`default`.`tests2` > at > org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:568) > at > org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:543) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:519) > at > org.apache.flink.table.catalog.hive.HiveCatalogITCase.testCsvTableViaSQL(HiveCatalogITCase.java:123) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at > org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runTestMethod(FlinkStandaloneHiveRunner.java:169) > at > org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:154) > at > org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.runChild(FlinkStandaloneHiveRunner.java:92) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(Extern
[jira] [Commented] (FLINK-15259) HiveInspector.toInspectors() should convert Flink constant to Hive constant
[ https://issues.apache.org/jira/browse/FLINK-15259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16996942#comment-16996942 ] Terry Wang commented on FLINK-15259: Why it is not a blocker? It'll cause hive function not work normally. cc [~lzljs3620320]too > HiveInspector.toInspectors() should convert Flink constant to Hive constant > > > Key: FLINK-15259 > URL: https://issues.apache.org/jira/browse/FLINK-15259 > Project: Flink > Issue Type: Bug > Components: Connectors / Hive >Affects Versions: 1.9.0, 1.10.0 >Reporter: Bowen Li >Assignee: Rui Li >Priority: Major > Fix For: 1.9.2, 1.10.0, 1.11.0 > > > repro test: > {code:java} > public class HiveModuleITCase { > @Test > public void test() { > TableEnvironment tEnv = > HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); > tEnv.unloadModule("core"); > tEnv.loadModule("hive", new HiveModule("2.3.4")); > tEnv.sqlQuery("select concat('an', 'bn')"); > } > } > {code} > seems that currently HiveInspector.toInspectors() didn't convert Flink > constant to Hive constant before calling > hiveShim.getObjectInspectorForConstant > I don't think it's a blocker -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15192) consider split 'SQL' page into multiple sub pages for better clarity
[ https://issues.apache.org/jira/browse/FLINK-15192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16995463#comment-16995463 ] Terry Wang commented on FLINK-15192: I'd like to improve it. After https://issues.apache.org/jira/browse/FLINK-15190 merged, I'll try to split this page into child pages to see the effect. > consider split 'SQL' page into multiple sub pages for better clarity > > > Key: FLINK-15192 > URL: https://issues.apache.org/jira/browse/FLINK-15192 > Project: Flink > Issue Type: Task > Components: Documentation >Reporter: Bowen Li >Assignee: Terry Wang >Priority: Major > Fix For: 1.10.0 > > > with FLINK-15190, we are gonna add a bunch of ddl which makes the page too > long and not really readable. > I suggest split "SQL" page into sub pages of "SQL DDL", "SQL DML", "SQL DQL", > and others if needed. > assigned to Terry temporarily. > cc [~jark] [~lzljs3620320] > As example, the SQL doc directory of Hive looks like below, which is a lot > better that Flink's current one > {code:java} > CHILD PAGES > Pages > LanguageManual > LanguageManual Cli > LanguageManual DDL > LanguageManual DML > LanguageManual Select > LanguageManual Joins > LanguageManual LateralView > LanguageManual Union > LanguageManual SubQueries > LanguageManual Sampling > LanguageManual Explain > LanguageManual VirtualColumns > Configuration Properties > LanguageManual ImportExport > LanguageManual Authorization > LanguageManual Types > Literals > LanguageManual VariableSubstitution > LanguageManual ORC > LanguageManual WindowingAndAnalytics > LanguageManual Indexing > LanguageManual JoinOptimization > LanguageManual LZO > LanguageManual Commands > Parquet > Enhanced Aggregation, Cube, Grouping and Rollup > FileFormats > Hive HPL/SQL > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15242) Add doc to introduce ddls or dmls supported by sql cli
Terry Wang created FLINK-15242: -- Summary: Add doc to introduce ddls or dmls supported by sql cli Key: FLINK-15242 URL: https://issues.apache.org/jira/browse/FLINK-15242 Project: Flink Issue Type: Sub-task Components: Documentation Affects Versions: 1.10.0 Reporter: Terry Wang Fix For: 1.10.0 Now in the document of sql client https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html, there isn't a part to introduce the ddls/dmls in a whole story. We should complete it before the 1.10 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13437) Add Hive SQL E2E test
[ https://issues.apache.org/jira/browse/FLINK-13437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16993142#comment-16993142 ] Terry Wang commented on FLINK-13437: hi [~liyu] I didn't have the permission to change the status, can you help change the status? > Add Hive SQL E2E test > - > > Key: FLINK-13437 > URL: https://issues.apache.org/jira/browse/FLINK-13437 > Project: Flink > Issue Type: Test > Components: Connectors / Hive, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: Terry Wang >Priority: Major > Fix For: 1.10.0 > > > We should add an E2E test for the Hive integration: List all tables and read > some metadata, read from an existing table, register a new table in Hive, use > a registered function, write to an existing table, write to a new table. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15148) Add doc for create/drop/alter database ddl
[ https://issues.apache.org/jira/browse/FLINK-15148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15148: --- Component/s: Documentation > Add doc for create/drop/alter database ddl > -- > > Key: FLINK-15148 > URL: https://issues.apache.org/jira/browse/FLINK-15148 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Terry Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-15147) Add doc for alter table set properties and rename table ddl
[ https://issues.apache.org/jira/browse/FLINK-15147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Wang updated FLINK-15147: --- Summary: Add doc for alter table set properties and rename table ddl (was: Add doc for alter table set properties and rename table) > Add doc for alter table set properties and rename table ddl > --- > > Key: FLINK-15147 > URL: https://issues.apache.org/jira/browse/FLINK-15147 > Project: Flink > Issue Type: Sub-task > Components: Documentation >Reporter: Terry Wang >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15148) Add doc for create/drop/alter database ddl
Terry Wang created FLINK-15148: -- Summary: Add doc for create/drop/alter database ddl Key: FLINK-15148 URL: https://issues.apache.org/jira/browse/FLINK-15148 Project: Flink Issue Type: Sub-task Reporter: Terry Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)