[jira] [Commented] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.
[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051466#comment-16051466 ] sunjincheng commented on FLINK-6886: So, We have two chances to deal with the problem,one is the PR. did. one is copy {{relNode}} when generating {{LogicalRelNode}}. IMO. I'm not sure copy is best way. What do you think? [~fhueske] [~jark] > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > --- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.
[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051451#comment-16051451 ] sunjincheng edited comment on FLINK-6886 at 6/16/17 6:27 AM: - But, I think not the optimization issue, because, {{FlinkPlannerImpl#rel}} will do the {{StreamTableEnvironment#translate}}, In this method we will do the {{optimize}}, After {{optimize}} we really translate {{TimeIndicatorRelDataType}} to {{TIMESTAMP}}. That's correct. The core problem is occur in {{ translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag) }}, we can see the second param {{ relNode.getRowType}} is the non-optimized node which contains {{TimeIndicatorRelDataType}}. all of the follows operations are based on the type of non-optimal node, so there will be such a problem. was (Author: sunjincheng121): But, I think not the optimization issue, because, {{FlinkPlannerImpl#rel}} will do the {{StreamTableEnvironment#translate}}, In this method we will do the {{optimize}}, After {{optimize}} we really translate {{TimeIndicatorRelDataType}} to {{TIMESTAMP}}. That's correct. The core problem is occur in {{ translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag) }}, we can see the second param {{ relNode.getRowType}} is the non-optimized node which contains {{TimeIndicatorRelDataType}}. all of the follows operations are based on the type of non-optimal node, so there will be such a problem. > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > --- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.
[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051451#comment-16051451 ] sunjincheng commented on FLINK-6886: But, I think not the optimization issue, because, {{FlinkPlannerImpl#rel}} will do the {{StreamTableEnvironment#translate}}, In this method we will do the {{optimize}}, After {{optimize}} we really translate {{TimeIndicatorRelDataType}} to {{TIMESTAMP}}. That's correct. The core problem is occur in {{ translate(dataStreamPlan, relNode.getRowType, queryConfig, withChangeFlag) }}, we can see the second param {{ relNode.getRowType}} is the non-optimized node which contains {{TimeIndicatorRelDataType}}. all of the follows operations are based on the type of non-optimal node, so there will be such a problem. > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > --- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6904) Support for quantifier range to CEP's pattern API
[ https://issues.apache.org/jira/browse/FLINK-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051442#comment-16051442 ] ASF GitHub Bot commented on FLINK-6904: --- Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4121 Hi @dawidwys thanks a lot for the review, what's your thought for the updated patch? > Support for quantifier range to CEP's pattern API > - > > Key: FLINK-6904 > URL: https://issues.apache.org/jira/browse/FLINK-6904 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently the quantifier has supported oneOrMore, times(int times), one(),we > should also support API such as times(int from, int to) to specify a > quantifier range. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4121: [FLINK-6904] [cep] Support for quantifier range to CEP's ...
Github user dianfu commented on the issue: https://github.com/apache/flink/pull/4121 Hi @dawidwys thanks a lot for the review, what's your thought for the updated patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.
[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051425#comment-16051425 ] sunjincheng commented on FLINK-6886: Hi, [~fhueske] Thanks for check this issue. Calcite can not recognize {{TimeIndicatorRelDataType}}, So, In SQL case {{FlinkPlannerImpl#rel}} will keep the {{TimeIndicatorRelDataType}}.I think we can not touch the calcite, so,we only have a chance to do optimization when generating {{LogicalRelNode}}. > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > --- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE
[ https://issues.apache.org/jira/browse/FLINK-6930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051384#comment-16051384 ] ASF GitHub Bot commented on FLINK-6930: --- Github user hustfxj commented on the issue: https://github.com/apache/flink/pull/4133 +1 > Selecting window start / end on row-based Tumble/Slide window causes NPE > > > Key: FLINK-6930 > URL: https://issues.apache.org/jira/browse/FLINK-6930 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.4.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > Selecting the start and end properties of a row-based window causes a > NullPointerException. > The following program: > {code} > val windowedTable = table > .window(Tumble over 2.rows on 'proctime as 'w) > .groupBy('w, 'string) > .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e) > {code} > causes > {code} > Caused by: java.lang.NullPointerException > at > org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556) > at > org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551) > at DataStreamCalcRule$40.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > {code} > We should validate that the start and end window properties are not accessed > if the window is defined on row-counts. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4133: [FLINK-6930] [table] Forbid selecting window start/end on...
Github user hustfxj commented on the issue: https://github.com/apache/flink/pull/4133 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE
[ https://issues.apache.org/jira/browse/FLINK-6930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-6930: -- Assignee: Jark Wu > Selecting window start / end on row-based Tumble/Slide window causes NPE > > > Key: FLINK-6930 > URL: https://issues.apache.org/jira/browse/FLINK-6930 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.4.0 >Reporter: Fabian Hueske >Assignee: Jark Wu > > Selecting the start and end properties of a row-based window causes a > NullPointerException. > The following program: > {code} > val windowedTable = table > .window(Tumble over 2.rows on 'proctime as 'w) > .groupBy('w, 'string) > .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e) > {code} > causes > {code} > Caused by: java.lang.NullPointerException > at > org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556) > at > org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551) > at DataStreamCalcRule$40.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > {code} > We should validate that the start and end window properties are not accessed > if the window is defined on row-counts. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6925) Add CONCAT/CONCAT_WS supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng reassigned FLINK-6925: -- Assignee: sunjincheng > Add CONCAT/CONCAT_WS supported in SQL > - > > Key: FLINK-6925 > URL: https://issues.apache.org/jira/browse/FLINK-6925 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > CONCAT(str1,str2,...)Returns the string that results from concatenating the > arguments. May have one or more arguments. If all arguments are nonbinary > strings, the result is a nonbinary string. If the arguments include any > binary strings, the result is a binary string. A numeric argument is > converted to its equivalent nonbinary string form. > CONCAT() returns NULL if any argument is NULL. > * Syntax: > CONCAT(str1,str2,...) > * Arguments > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT('F', 'lin', 'k') -> 'Flink' > CONCAT('M', NULL, 'L') -> NULL > CONCAT(14.3) -> '14.3' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat] > CONCAT_WS() stands for Concatenate With Separator and is a special form of > CONCAT(). The first argument is the separator for the rest of the arguments. > The separator is added between the strings to be concatenated. The separator > can be a string, as can the rest of the arguments. If the separator is NULL, > the result is NULL. > * Syntax: > CONCAT_WS(separator,str1,str2,...) > * Arguments > ** separator - > ** str1,str2,... - > * Return Types > string > * Example: > CONCAT_WS(',','First name','Second name','Last Name') -> 'First name,Second > name,Last Name' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_concat-ws] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6934) Consider moving LRUCache class
[ https://issues.apache.org/jira/browse/FLINK-6934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051346#comment-16051346 ] mingleizhang commented on FLINK-6934: - I would say just suggest remove it. Actually, it belongs to until tools but not used almost 3 years. If I would say let us remove {{LRUCache}}, we should remove {{LRUCacheMap}} class also I will say. [~uce] [~Zentol] What do you think ? > Consider moving LRUCache class > -- > > Key: FLINK-6934 > URL: https://issues.apache.org/jira/browse/FLINK-6934 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Reporter: mingleizhang >Assignee: mingleizhang > > LRUCache class is not used any more. So, I would suggest remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6934) Consider moving LRUCache class
mingleizhang created FLINK-6934: --- Summary: Consider moving LRUCache class Key: FLINK-6934 URL: https://issues.apache.org/jira/browse/FLINK-6934 Project: Flink Issue Type: Improvement Components: Local Runtime Reporter: mingleizhang Assignee: mingleizhang LRUCache class is not used any more. So, I would suggest remove it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE
[ https://issues.apache.org/jira/browse/FLINK-6930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051342#comment-16051342 ] ASF GitHub Bot commented on FLINK-6930: --- GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/4133 [FLINK-6930] [table] Forbid selecting window start/end on row-based Tumble/Slide windows Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink forbid-rowbased-window-start Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4133 commit 4cf422acd25b18ebc285ace542d49ebc646707db Author: Jark Wu Date: 2017-06-16T03:49:31Z [FLINK-6930] [table] Forbid selecting window start/end on row-based Tumble/Slide windows > Selecting window start / end on row-based Tumble/Slide window causes NPE > > > Key: FLINK-6930 > URL: https://issues.apache.org/jira/browse/FLINK-6930 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0, 1.4.0 >Reporter: Fabian Hueske > > Selecting the start and end properties of a row-based window causes a > NullPointerException. > The following program: > {code} > val windowedTable = table > .window(Tumble over 2.rows on 'proctime as 'w) > .groupBy('w, 'string) > .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e) > {code} > causes > {code} > Caused by: java.lang.NullPointerException > at > org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556) > at > org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551) > at DataStreamCalcRule$40.processElement(Unknown Source) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67) > at > org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75) > at > org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElemen
[GitHub] flink pull request #4133: [FLINK-6930] [table] Forbid selecting window start...
GitHub user wuchong opened a pull request: https://github.com/apache/flink/pull/4133 [FLINK-6930] [table] Forbid selecting window start/end on row-based Tumble/Slide windows Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/wuchong/flink forbid-rowbased-window-start Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4133.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4133 commit 4cf422acd25b18ebc285ace542d49ebc646707db Author: Jark Wu Date: 2017-06-16T03:49:31Z [FLINK-6930] [table] Forbid selecting window start/end on row-based Tumble/Slide windows --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6933) Refactor NFACompiler to reduce code duplication
[ https://issues.apache.org/jira/browse/FLINK-6933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051335#comment-16051335 ] ASF GitHub Bot commented on FLINK-6933: --- GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4132 [FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-6933 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4132 commit 97da09af4828661fcde22c9d09aee69cfa7d7ac5 Author: Dian Fu Date: 2017-06-16T03:43:04Z [FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication > Refactor NFACompiler to reduce code duplication > --- > > Key: FLINK-6933 > URL: https://issues.apache.org/jira/browse/FLINK-6933 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > I find that part of the code in NFACompiler is duplicate, this JIRA tries to > eliminate the code duplication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4132: [FLINK-6933] [cep] Refactor NFACompiler to reduce ...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/4132 [FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink FLINK-6933 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4132.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4132 commit 97da09af4828661fcde22c9d09aee69cfa7d7ac5 Author: Dian Fu Date: 2017-06-16T03:43:04Z [FLINK-6933] [cep] Refactor NFACompiler to reduce code duplication --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6933) Refactor NFACompiler to reduce code duplication
Dian Fu created FLINK-6933: -- Summary: Refactor NFACompiler to reduce code duplication Key: FLINK-6933 URL: https://issues.apache.org/jira/browse/FLINK-6933 Project: Flink Issue Type: Bug Components: CEP Reporter: Dian Fu Assignee: Dian Fu I find that part of the code in NFACompiler is duplicate, this JIRA tries to eliminate the code duplication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6891) Add LOG(X) supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6891: --- Description: LOG(X), LOG(B,X) If called with one parameter, this function returns the natural logarithm of X. If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported. The inverse of this function (when called with a single argument) is the EXP() function. If called with two parameters, this function returns the logarithm of X to the base B. If X is less than or equal to 0, or if B is less than or equal to 1, then NULL is returned. * Example: LOG(2) -> 0.69314718055995 LOG(-2) -> NULL LOG(2,65536) -> 16 LOG(10,100) -> 2 LOG(1,100) -> NULL * See more: ** [MySQL| https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log] --NOTE-- In this JIRA. NULL case will throw IllegalArgumentException. was: LOG(X), LOG(B,X) If called with one parameter, this function returns the natural logarithm of X. If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported. The inverse of this function (when called with a single argument) is the EXP() function. If called with two parameters, this function returns the logarithm of X to the base B. If X is less than or equal to 0, or if B is less than or equal to 1, then NULL is returned. * Example: LOG(2) -> 0.69314718055995 LOG(-2) -> NULL LOG(2,65536) -> 16 LOG(10,100) -> 2 LOG(1,100) -> NULL * See more: ** [MySQL| https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log] --NOTE-- In this JIRA. NULL case will thow IllegalArgumentException. > Add LOG(X) supported in SQL > --- > > Key: FLINK-6891 > URL: https://issues.apache.org/jira/browse/FLINK-6891 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > LOG(X), LOG(B,X) > If called with one parameter, this function returns the natural logarithm of > X. If X is less than or equal to 0.0E0, the function returns NULL and (as of > MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported. > The inverse of this function (when called with a single argument) is the > EXP() function. > If called with two parameters, this function returns the logarithm of X to > the base B. If X is less than or equal to 0, or if B is less than or equal to > 1, then NULL is returned. > * Example: > LOG(2) -> 0.69314718055995 > LOG(-2) -> NULL > LOG(2,65536) -> 16 > LOG(10,100) -> 2 > LOG(1,100) -> NULL > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log] > --NOTE-- In this JIRA. NULL case will throw IllegalArgumentException. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6891) Add LOG(X) supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6891: --- Description: LOG(X), LOG(B,X) If called with one parameter, this function returns the natural logarithm of X. If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported. The inverse of this function (when called with a single argument) is the EXP() function. If called with two parameters, this function returns the logarithm of X to the base B. If X is less than or equal to 0, or if B is less than or equal to 1, then NULL is returned. * Example: LOG(2) -> 0.69314718055995 LOG(-2) -> NULL LOG(2,65536) -> 16 LOG(10,100) -> 2 LOG(1,100) -> NULL * See more: ** [MySQL| https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log] --NOTE-- In this JIRA. NULL case will thow IllegalArgumentException. was: LOG(X), LOG(B,X) If called with one parameter, this function returns the natural logarithm of X. If X is less than or equal to 0.0E0, the function returns NULL and (as of MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported. The inverse of this function (when called with a single argument) is the EXP() function. If called with two parameters, this function returns the logarithm of X to the base B. If X is less than or equal to 0, or if B is less than or equal to 1, then NULL is returned. * Example: LOG(2) -> 0.69314718055995 LOG(-2) -> NULL LOG(2,65536) -> 16 LOG(10,100) -> 2 LOG(1,100) -> NULL * See more: ** [MySQL| https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log] --NOTE-- In this JIRA. we only implement LOG(X). > Add LOG(X) supported in SQL > --- > > Key: FLINK-6891 > URL: https://issues.apache.org/jira/browse/FLINK-6891 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > LOG(X), LOG(B,X) > If called with one parameter, this function returns the natural logarithm of > X. If X is less than or equal to 0.0E0, the function returns NULL and (as of > MySQL 5.7.4) a warning “Invalid argument for logarithm” is reported. > The inverse of this function (when called with a single argument) is the > EXP() function. > If called with two parameters, this function returns the logarithm of X to > the base B. If X is less than or equal to 0, or if B is less than or equal to > 1, then NULL is returned. > * Example: > LOG(2) -> 0.69314718055995 > LOG(-2) -> NULL > LOG(2,65536) -> 16 > LOG(10,100) -> 2 > LOG(1,100) -> NULL > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/mathematical-functions.html#function_log] > --NOTE-- In this JIRA. NULL case will thow IllegalArgumentException. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051320#comment-16051320 ] ASF GitHub Bot commented on FLINK-6896: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4111 Travis reported a bug about composite type. Fixed it. Waiting for the new CI pass. > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Content(long recordTime, String urlKey) { > super(); > this.recordTime = recordTime; > this.urlKey = urlKey; > } > public String getUrlKey() { > return urlKey; > } > public void setUrlKey(String urlKey) { > this.urlKey = urlKey; > } > public long getRecordTime() { > return recordTime; > } > public void setRecordTime(long recordTime) { > this.recordTime = recordTime; > } > public long getHttpGetMessageCount() { > return httpGetMessageCount; > } > public void setHttpGetMessageCount(long httpGetMessageCount) { > this.httpGetMessageCount = httpGetMessageCount; > } > public long getHttpPostMessageCount() { > return httpPostMessageCount; > } > public void setHttpPostMessageCount(long httpPostMessageCount) { > this.httpPostMessageCount = httpPostMessageCount; > } > public long getUplink() { > return uplink; > } > public void setUplink(long uplink) { > this.uplink = uplink; > } >
[GitHub] flink issue #4111: [FLINK-6896][table] Fix generate PojoType input result ex...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/4111 Travis reported a bug about composite type. Fixed it. Waiting for the new CI pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6932) Update the inaccessible Dataflow Model paper link
[ https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051269#comment-16051269 ] ASF GitHub Bot commented on FLINK-6932: --- Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4131#discussion_r122348284 --- Diff: docs/dev/event_time.md --- @@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in the Flink DataStream API *Note: Flink implements many techniques from the Dataflow Model. For a good introduction to event time and watermarks, have a look at the articles below.* - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau - - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf) --- End diff -- Yes. Exactly. I will do a change to this. > Update the inaccessible Dataflow Model paper link > - > > Key: FLINK-6932 > URL: https://issues.apache.org/jira/browse/FLINK-6932 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: mingleizhang >Assignee: mingleizhang > Labels: None > > I tried to access the Dataflow Model paper link which under > [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], > then it gives me an error [ 404 ] instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4131: [FLINK-6932] [doc] Update inaccessible Dataflow Mo...
Github user zhangminglei commented on a diff in the pull request: https://github.com/apache/flink/pull/4131#discussion_r122348284 --- Diff: docs/dev/event_time.md --- @@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in the Flink DataStream API *Note: Flink implements many techniques from the Dataflow Model. For a good introduction to event time and watermarks, have a look at the articles below.* - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau - - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf) --- End diff -- Yes. Exactly. I will do a change to this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6310) LocalExecutor#endSession() uses wrong lock for synchronization
[ https://issues.apache.org/jira/browse/FLINK-6310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-6310: -- Description: Here is related code: {code} public void endSession(JobID jobID) throws Exception { synchronized (LocalExecutor.class) { LocalFlinkMiniCluster flink = this.flink; {code} In other places, lock field is used for synchronization: {code} public void start() throws Exception { synchronized (lock) { {code} was: Here is related code: {code} public void endSession(JobID jobID) throws Exception { synchronized (LocalExecutor.class) { LocalFlinkMiniCluster flink = this.flink; {code} In other places, lock field is used for synchronization: {code} public void start() throws Exception { synchronized (lock) { {code} > LocalExecutor#endSession() uses wrong lock for synchronization > -- > > Key: FLINK-6310 > URL: https://issues.apache.org/jira/browse/FLINK-6310 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Reporter: Ted Yu > > Here is related code: > {code} > public void endSession(JobID jobID) throws Exception { > synchronized (LocalExecutor.class) { > LocalFlinkMiniCluster flink = this.flink; > {code} > In other places, lock field is used for synchronization: > {code} > public void start() throws Exception { > synchronized (lock) { > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase
[ https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-6105: -- Description: When catching InterruptedException, we should throw InterruptedIOException instead of IOException. The following example is from HadoopInputFormatBase : {code} try { splits = this.mapreduceInputFormat.getSplits(jobContext); } catch (InterruptedException e) { throw new IOException("Could not get Splits.", e); } {code} There may be other places where IOE is thrown. was: When catching InterruptedException, we should throw InterruptedIOException instead of IOException. The following example is from HadoopInputFormatBase : {code} try { splits = this.mapreduceInputFormat.getSplits(jobContext); } catch (InterruptedException e) { throw new IOException("Could not get Splits.", e); } {code} There may be other places where IOE is thrown. > Properly handle InterruptedException in HadoopInputFormatBase > - > > Key: FLINK-6105 > URL: https://issues.apache.org/jira/browse/FLINK-6105 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu > > When catching InterruptedException, we should throw InterruptedIOException > instead of IOException. > The following example is from HadoopInputFormatBase : > {code} > try { > splits = this.mapreduceInputFormat.getSplits(jobContext); > } catch (InterruptedException e) { > throw new IOException("Could not get Splits.", e); > } > {code} > There may be other places where IOE is thrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050925#comment-16050925 ] Erik van Oosten edited comment on FLINK-6928 at 6/15/17 6:39 PM: - In my ideal world method {{getTargetTopic}} would be removed from {{*SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{*SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. was (Author: erikvanoosten): In my ideal world method {{getTargetTopic}} would be removed from {{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6928) Kafka sink: default topic should not need to exist
[ https://issues.apache.org/jira/browse/FLINK-6928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050925#comment-16050925 ] Erik van Oosten commented on FLINK-6928: In my ideal world method {{getTargetTopic}} would be removed from {{SerializationSchema}} and moved to a new interface, e.g. {{DestinationTopic}}. Then there are two constructor variants for {{FlinkKafkaProducer}}: one would take a topic ({{String}}), the other would take a {{DestinationTopic}}. Both would have the simplified {{SerializationSchema}} as argument. To make things simple internally, the first variant could wrap the topic in a implementation of {{DestinationTopic}} that always returns the same topic. > Kafka sink: default topic should not need to exist > -- > > Key: FLINK-6928 > URL: https://issues.apache.org/jira/browse/FLINK-6928 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.3.0, 1.2.1 >Reporter: Erik van Oosten > > When using a Kafka sink, the defaultTopic needs to exist even when it is > never used. It would be nice if fetching partition information for the > default topic would be delayed until the moment a topic is actually used. > Cause: {{FlinkKafkaProducerBase.open}} fetches partition information for the > default topic. > In addition, it would be nice if we could signal that the defaultTopic is not > needed by passing {{null}}. Currently, a value for the defaultTopic is > required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6932) Update the inaccessible Dataflow Model paper link
[ https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050830#comment-16050830 ] ASF GitHub Bot commented on FLINK-6932: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/4131#discussion_r122263994 --- Diff: docs/dev/event_time.md --- @@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in the Flink DataStream API *Note: Flink implements many techniques from the Dataflow Model. For a good introduction to event time and watermarks, have a look at the articles below.* - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau - - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf) --- End diff -- Looks like this should have been `https://research.google.com/pubs/archive/43864.pdf` before the redirect (linked from `https://research.google.com/pubs/pub43864.html`) > Update the inaccessible Dataflow Model paper link > - > > Key: FLINK-6932 > URL: https://issues.apache.org/jira/browse/FLINK-6932 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: mingleizhang >Assignee: mingleizhang > Labels: None > > I tried to access the Dataflow Model paper link which under > [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], > then it gives me an error [ 404 ] instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4131: [FLINK-6932] [doc] Update inaccessible Dataflow Mo...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/4131#discussion_r122263994 --- Diff: docs/dev/event_time.md --- @@ -146,7 +146,7 @@ to use timestamp assignment and watermark generation in the Flink DataStream API *Note: Flink implements many techniques from the Dataflow Model. For a good introduction to event time and watermarks, have a look at the articles below.* - [Streaming 101](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101) by Tyler Akidau - - The [Dataflow Model paper](https://static.googleusercontent.com/media/research.google.com/en/pubs/archive/43864.pdf) --- End diff -- Looks like this should have been `https://research.google.com/pubs/archive/43864.pdf` before the redirect (linked from `https://research.google.com/pubs/pub43864.html`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6932) Update the inaccessible Dataflow Model paper link
[ https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050785#comment-16050785 ] ASF GitHub Bot commented on FLINK-6932: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4131 [FLINK-6932] [doc] Update inaccessible Dataflow Model paper link Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6932 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4131 commit eb4ad3ece3ca8196035e13e191b22e90510b182b Author: zhangminglei Date: 2017-06-15T17:04:06Z [FLINK-6932] [doc] Update inaccessible Dataflow Model paper link > Update the inaccessible Dataflow Model paper link > - > > Key: FLINK-6932 > URL: https://issues.apache.org/jira/browse/FLINK-6932 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: mingleizhang >Assignee: mingleizhang > Labels: None > > I tried to access the Dataflow Model paper link which under > [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], > then it gives me an error [ 404 ] instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4131: [FLINK-6932] [doc] Update inaccessible Dataflow Mo...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/4131 [FLINK-6932] [doc] Update inaccessible Dataflow Model paper link Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-6932 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4131.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4131 commit eb4ad3ece3ca8196035e13e191b22e90510b182b Author: zhangminglei Date: 2017-06-15T17:04:06Z [FLINK-6932] [doc] Update inaccessible Dataflow Model paper link --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API
[ https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050782#comment-16050782 ] ASF GitHub Bot commented on FLINK-6693: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122255933 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -110,6 +110,12 @@ under the License. compile + + joda-time + joda-time + provided --- End diff -- Why is `joda-time` provided? Is it a transitive dependency? > Support DATE_FORMAT function in the Table / SQL API > --- > > Key: FLINK-6693 > URL: https://issues.apache.org/jira/browse/FLINK-6693 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be quite handy to support the {{DATE_FORMAT}} function in Flink to > support various date / time related operations: > The specification of the {{DATE_FORMAT}} function can be found in > https://prestodb.io/docs/current/functions/datetime.html. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API
[ https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050783#comment-16050783 ] ASF GitHub Bot commented on FLINK-6693: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122240819 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime; + +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; + +/** + * Built-in scalar functions for date time related operations. + */ +public class DateTimeFunctions { --- End diff -- We try to keep the code base of the Table API consistently in Scala. There are a few classes in Java which are mostly copied from Calcite. Can you port this class? > Support DATE_FORMAT function in the Table / SQL API > --- > > Key: FLINK-6693 > URL: https://issues.apache.org/jira/browse/FLINK-6693 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be quite handy to support the {{DATE_FORMAT}} function in Flink to > support various date / time related operations: > The specification of the {{DATE_FORMAT}} function can be found in > https://prestodb.io/docs/current/functions/datetime.html. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API
[ https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050784#comment-16050784 ] ASF GitHub Bot commented on FLINK-6693: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122254102 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.ExpressionTestBase +import org.apache.flink.types.Row +import org.junit.Test + +class DateTimeFunctionTest extends ExpressionTestBase { + + @Test + def testDateFormat(): Unit = { +testAllApis( + DateFormat('f0, "%Y"), --- End diff -- I think it would be good to have a pattern that uses all features. > Support DATE_FORMAT function in the Table / SQL API > --- > > Key: FLINK-6693 > URL: https://issues.apache.org/jira/browse/FLINK-6693 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be quite handy to support the {{DATE_FORMAT}} function in Flink to > support various date / time related operations: > The specification of the {{DATE_FORMAT}} function can be found in > https://prestodb.io/docs/current/functions/datetime.html. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122254102 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DateTimeFunctionTest.scala --- @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import java.sql.Timestamp + +import org.apache.flink.api.common.typeinfo.{TypeInformation, Types} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.expressions.utils.ExpressionTestBase +import org.apache.flink.types.Row +import org.junit.Test + +class DateTimeFunctionTest extends ExpressionTestBase { + + @Test + def testDateFormat(): Unit = { +testAllApis( + DateFormat('f0, "%Y"), --- End diff -- I think it would be good to have a pattern that uses all features. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6693) Support DATE_FORMAT function in the Table / SQL API
[ https://issues.apache.org/jira/browse/FLINK-6693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050781#comment-16050781 ] ASF GitHub Bot commented on FLINK-6693: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122255414 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime; + +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; + +/** + * Built-in scalar functions for date time related operations. + */ +public class DateTimeFunctions { + private static final int PIVOT_YEAR = 2020; + + private static final ThreadLocalCache DATETIME_FORMATTER_CACHE = + new ThreadLocalCache(100) { + @Override + protected DateTimeFormatter getNewInstance(String format) { + return createDateTimeFormatter(format); + } + }; + + public static String dateFormat(long ts, String formatString) { + DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(formatString); --- End diff -- Would it make sense to add a shortcut during code-gen if the pattern is a string literal (this should be the most common case) to avoid the lookup? > Support DATE_FORMAT function in the Table / SQL API > --- > > Key: FLINK-6693 > URL: https://issues.apache.org/jira/browse/FLINK-6693 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > It would be quite handy to support the {{DATE_FORMAT}} function in Flink to > support various date / time related operations: > The specification of the {{DATE_FORMAT}} function can be found in > https://prestodb.io/docs/current/functions/datetime.html. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122240819 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime; + +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; + +/** + * Built-in scalar functions for date time related operations. + */ +public class DateTimeFunctions { --- End diff -- We try to keep the code base of the Table API consistently in Scala. There are a few classes in Java which are mostly copied from Calcite. Can you port this class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122255933 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -110,6 +110,12 @@ under the License. compile + + joda-time + joda-time + provided --- End diff -- Why is `joda-time` provided? Is it a transitive dependency? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4078: [FLINK-6693] [table] Support DATE_FORMAT function ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4078#discussion_r122255414 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/DateTimeFunctions.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime; + +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.DateTimeFormatterBuilder; + +/** + * Built-in scalar functions for date time related operations. + */ +public class DateTimeFunctions { + private static final int PIVOT_YEAR = 2020; + + private static final ThreadLocalCache DATETIME_FORMATTER_CACHE = + new ThreadLocalCache(100) { + @Override + protected DateTimeFormatter getNewInstance(String format) { + return createDateTimeFormatter(format); + } + }; + + public static String dateFormat(long ts, String formatString) { + DateTimeFormatter formatter = DATETIME_FORMATTER_CACHE.get(formatString); --- End diff -- Would it make sense to add a shortcut during code-gen if the pattern is a string literal (this should be the most common case) to avoid the lookup? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6932) Update the inaccessible Dataflow Model paper link
mingleizhang created FLINK-6932: --- Summary: Update the inaccessible Dataflow Model paper link Key: FLINK-6932 URL: https://issues.apache.org/jira/browse/FLINK-6932 Project: Flink Issue Type: Bug Components: Documentation Reporter: mingleizhang Assignee: mingleizhang I tried to access the [#Dataflow Model paper] link which under [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], then it gives me an error [ 404 ] instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6932) Update the inaccessible Dataflow Model paper link
[ https://issues.apache.org/jira/browse/FLINK-6932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6932: Description: I tried to access the Dataflow Model paper link which under [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], then it gives me an error [ 404 ] instead. (was: I tried to access the [#Dataflow Model paper] link which under [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], then it gives me an error [ 404 ] instead.) > Update the inaccessible Dataflow Model paper link > - > > Key: FLINK-6932 > URL: https://issues.apache.org/jira/browse/FLINK-6932 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: mingleizhang >Assignee: mingleizhang > Labels: None > > I tried to access the Dataflow Model paper link which under > [https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html], > then it gives me an error [ 404 ] instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4641) Support branching CEP patterns
[ https://issues.apache.org/jira/browse/FLINK-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050551#comment-16050551 ] Alexander Chermenin commented on FLINK-4641: Hi [~dian.fu]! Yes of course, you're welcome! Unfortunately I don't have enough time to do it. > Support branching CEP patterns > --- > > Key: FLINK-4641 > URL: https://issues.apache.org/jira/browse/FLINK-4641 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Till Rohrmann >Assignee: Alexander Chermenin > > We should add support for branching CEP patterns to the Pattern API. > {code} > |--> B --| > || > A -- --> D > || > |--> C --| > {code} > This feature will require changes to the {{Pattern}} class and the > {{NFACompiler}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050537#comment-16050537 ] ASF GitHub Bot commented on FLINK-6896: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122212537 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Yes, @wuchong please merge :-) > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Content(long recordTime, String urlKey) { > super();
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122212537 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Yes, @wuchong please merge :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6886) Fix Timestamp field can not be selected in event time case when toDataStream[T], `T` not a `Row` Type.
[ https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050535#comment-16050535 ] Fabian Hueske commented on FLINK-6886: -- Maybe there's another way to fix this problem. I played a bit around and found the following: The following Table API query is executed correctly: {code} val table = stream.toTable(tEnv, 'l, 'i, 'n, 'proctime.proctime) val windowedTable = table .window(Tumble over 2.seconds on 'proctime as 'w) .groupBy('w, 'n) .select('n, 'i.count as 'cnt, 'w.start as 's, 'w.end as 'e) val results = windowedTable.toAppendStream[MP](queryConfig) // POJO class MP(var s: Timestamp, var e: Timestamp, var cnt: Long, var n: String) { def this() { this(null, null, 0, null) } override def toString: String = s"$n,${s.toString},${e.toString},$cnt" } {code} whereas the equivalent SQL query fails with the reported exception ("The field types of physical and logical row types do not match") {code} val sqlTable = tEnv.sql( s"""SELECT TUMBLE_START(proctime, INTERVAL '2' SECOND) AS s, | TUMBLE_END(proctime, INTERVAL '2' SECOND) AS e, | n, | COUNT(i) as cnt |FROM $table |GROUP BY n, TUMBLE(proctime, INTERVAL '2' SECOND) | """.stripMargin) val results = sqlTable.toAppendStream[MP](queryConfig) {code} The plans of both queries look similar, but the SQL plan seems to lack the correct final projection: {code} // Table API plan == Abstract Syntax Tree == LogicalProject(n=[$0], cnt=[AS($1, 'cnt')], s=[AS($2, 's')], e=[AS($3, 'e')]) LogicalWindowAggregate(group=[{0}], TMP_0=[COUNT($1)]) LogicalProject(n=[$2], i=[$1], proctime=[$3]) LogicalTableScan(table=[[_DataStreamTable_0]]) == Optimized Logical Plan == DataStreamCalc(select=[n, TMP_0 AS cnt, TMP_1 AS s, TMP_2 AS e]) DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w, 'proctime, 2000.millis)], select=[n, COUNT(i) AS TMP_0, start('w) AS TMP_1, end('w) AS TMP_2]) DataStreamCalc(select=[n, i, proctime]) DataStreamScan(table=[[_DataStreamTable_0]]) // SQL plans == Abstract Syntax Tree == LogicalProject(s=[TUMBLE_START($1)], e=[TUMBLE_END($1)], n=[$0], cnt=[$2]) LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)]) LogicalProject(n=[$2], $f1=[TUMBLE($3, 2000)], i=[$1]) LogicalTableScan(table=[[UnnamedTable$3]]) == Optimized Logical Plan == DataStreamCalc(select=[w$start, w$end, n, cnt]) DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w$, 'proctime, 2000.millis)], select=[n, COUNT(i) AS cnt, start('w$) AS w$start, end('w$) AS w$end]) DataStreamCalc(select=[n, proctime, i]) DataStreamScan(table=[[_DataStreamTable_0]]) {code} So this doesn't seem to be a principled issue with the time attributes or window properties but rather an issue of the SQL optimization. What do you think [~sunjincheng121] and [~jark]? > Fix Timestamp field can not be selected in event time case when > toDataStream[T], `T` not a `Row` Type. > --- > > Key: FLINK-6886 > URL: https://issues.apache.org/jira/browse/FLINK-6886 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: sunjincheng > > Currently for event-time window(group/over), When contain `Timestamp` type > field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such > `PojoType`, will throw a exception. In this JIRA. will fix this bug. For > example: > Group Window on SQL: > {code} > SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as > winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY > name, TUMBLE(rowtime, INTERVAL '5' SECOND) > {code} > Throw Exception: > {code} > org.apache.flink.table.api.TableException: The field types of physical and > logical row types do not match.This is a bug and should not happen. Please > file an issue. > at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) > at > org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721) > at > org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247) > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647) > {code} > In fact, when we solve this exception,subsequent other exceptions will be > thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}} > method bug. So in this JIRA. will fix it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4130: [FLINK-6773] [checkpoint] Introduce compression (snappy) ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4130 CC @tillrohrmann --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints
[ https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050508#comment-16050508 ] ASF GitHub Bot commented on FLINK-6773: --- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/4130 [FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st… This PR introduce optional snappy compression for the keyed state in full checkpoints and savepoints. This feature can be activated through a flag in {{ExecutionConfig}}. For the future, we can also support user-defined compression schemes, which will also require a upgrade and compatibility feature, as described in FLINK-6931. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink compressedKeyGroups Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4130.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4130 > Use compression (e.g. snappy) for full check/savepoints > --- > > Key: FLINK-6773 > URL: https://issues.apache.org/jira/browse/FLINK-6773 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > We could use compression (e.g. snappy stream compression) to decrease the > size of our full checkpoints and savepoints. From some initial experiments, I > think there is great potential to achieve compression rates around 30-50%. > Given those numbers, I think this is very low hanging fruit to implement. > One point to consider in the implementation is that compression blocks should > respect key-groups, i.e. typically it should make sense to compress per > key-group. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6773) Use compression (e.g. snappy) for full check/savepoints
[ https://issues.apache.org/jira/browse/FLINK-6773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050513#comment-16050513 ] ASF GitHub Bot commented on FLINK-6773: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4130 CC @tillrohrmann > Use compression (e.g. snappy) for full check/savepoints > --- > > Key: FLINK-6773 > URL: https://issues.apache.org/jira/browse/FLINK-6773 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Stefan Richter >Assignee: Stefan Richter > > We could use compression (e.g. snappy stream compression) to decrease the > size of our full checkpoints and savepoints. From some initial experiments, I > think there is great potential to achieve compression rates around 30-50%. > Given those numbers, I think this is very low hanging fruit to implement. > One point to consider in the implementation is that compression blocks should > respect key-groups, i.e. typically it should make sense to compress per > key-group. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4130: [FLINK-6773] [checkpoint] Introduce compression (s...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/4130 [FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed st⦠This PR introduce optional snappy compression for the keyed state in full checkpoints and savepoints. This feature can be activated through a flag in {{ExecutionConfig}}. For the future, we can also support user-defined compression schemes, which will also require a upgrade and compatibility feature, as described in FLINK-6931. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink compressedKeyGroups Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4130.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4130 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050502#comment-16050502 ] ASF GitHub Bot commented on FLINK-6896: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122205799 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Thanks @wuchong I check it in my side. nice. +1 Best, SunJincheng > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Cont
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122205799 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Thanks @wuchong I check it in my side. nice. +1 Best, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6931) Support custom compression formats for checkpoints (+Upgrade/Compatibility)
Stefan Richter created FLINK-6931: - Summary: Support custom compression formats for checkpoints (+Upgrade/Compatibility) Key: FLINK-6931 URL: https://issues.apache.org/jira/browse/FLINK-6931 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Stefan Richter With FLINK-6773, we introduced optional snappy compression for keyed state in full checkpoints and savepoints. We should offer users a way to register their own compression formats with the {{ExecutionConfig}}. For this, we should also have a compatibility story, very similar to what {{TypeSerializerConfigSnapshot}} doesfor type serializers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6836) Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure
[ https://issues.apache.org/jira/browse/FLINK-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050495#comment-16050495 ] ASF GitHub Bot commented on FLINK-6836: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4120 +1 to merge. > Failing YARNSessionCapacitySchedulerITCase.testTaskManagerFailure > - > > Key: FLINK-6836 > URL: https://issues.apache.org/jira/browse/FLINK-6836 > Project: Flink > Issue Type: Bug > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: test-stability > > The master is currently unstable. The > {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} fails with > Hadoop version {{2.6.5}}, {{2.7.3}} and {{2.8.0}}. > See this build [1] for example. > [1] https://travis-ci.org/apache/flink/builds/238720589 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4120: [FLINK-6836] [tests] Fix YARNSessionCapacitySchedulerITCa...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4120 +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050474#comment-16050474 ] ASF GitHub Bot commented on FLINK-6896: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122201344 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- @fhueske thanks for your reviewing. So if the travis pass, I will merge the code ? > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Content(lo
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122201344 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- @fhueske thanks for your reviewing. So if the travis pass, I will merge the code ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6930) Selecting window start / end on row-based Tumble/Slide window causes NPE
Fabian Hueske created FLINK-6930: Summary: Selecting window start / end on row-based Tumble/Slide window causes NPE Key: FLINK-6930 URL: https://issues.apache.org/jira/browse/FLINK-6930 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.0, 1.4.0 Reporter: Fabian Hueske Selecting the start and end properties of a row-based window causes a NullPointerException. The following program: {code} val windowedTable = table .window(Tumble over 2.rows on 'proctime as 'w) .groupBy('w, 'string) .select('string as 'n, 'int.count as 'cnt, 'w.start as 's, 'w.end as 'e) {code} causes {code} Caused by: java.lang.NullPointerException at org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1556) at org.apache.calcite.runtime.SqlFunctions.toLong(SqlFunctions.java:1551) at DataStreamCalcRule$40.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:67) at org.apache.flink.table.runtime.CRowProcessRunner.processElement(CRowProcessRunner.scala:35) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:890) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:868) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:75) at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:37) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:599) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:456) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) {code} We should validate that the start and end window properties are not accessed if the window is defined on row-counts. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads
[ https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050456#comment-16050456 ] Nico Kruber commented on FLINK-5908: let's solve this during the re-write of the BLOB store with FLIP-19 > Blob Cache can (rarely) get corrupted on failed blob downloads > -- > > Key: FLINK-5908 > URL: https://issues.apache.org/jira/browse/FLINK-5908 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.2.0, 1.3.0, 1.4.0 >Reporter: Stephan Ewen > > The Blob Cache downloads files directly to the target file location. > While it tries to clean up failed attempts, there is a change that this > cleanup does not complete. > In that case, we have a corrupt file at the target location. The blob cache > then assumes that it already has the file cached already and future requests > do not attempt to re-download the file. > The fix would be to download to a temp file name, validate the integrity, and > rename to the target file path when the validation succeeds. > The validation for "content addressable" could even include validating the > checksum hash. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads
[ https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-5908: --- Component/s: Network > Blob Cache can (rarely) get corrupted on failed blob downloads > -- > > Key: FLINK-5908 > URL: https://issues.apache.org/jira/browse/FLINK-5908 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.2.0, 1.3.0, 1.4.0 >Reporter: Stephan Ewen > > The Blob Cache downloads files directly to the target file location. > While it tries to clean up failed attempts, there is a change that this > cleanup does not complete. > In that case, we have a corrupt file at the target location. The blob cache > then assumes that it already has the file cached already and future requests > do not attempt to re-download the file. > The fix would be to download to a temp file name, validate the integrity, and > rename to the target file path when the validation succeeds. > The validation for "content addressable" could even include validating the > checksum hash. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads
[ https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-5908: --- Affects Version/s: 1.4.0 1.3.0 > Blob Cache can (rarely) get corrupted on failed blob downloads > -- > > Key: FLINK-5908 > URL: https://issues.apache.org/jira/browse/FLINK-5908 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, Network >Affects Versions: 1.2.0, 1.3.0, 1.4.0 >Reporter: Stephan Ewen > > The Blob Cache downloads files directly to the target file location. > While it tries to clean up failed attempts, there is a change that this > cleanup does not complete. > In that case, we have a corrupt file at the target location. The blob cache > then assumes that it already has the file cached already and future requests > do not attempt to re-download the file. > The fix would be to download to a temp file name, validate the integrity, and > rename to the target file path when the validation succeeds. > The validation for "content addressable" could even include validating the > checksum hash. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5908) Blob Cache can (rarely) get corrupted on failed blob downloads
[ https://issues.apache.org/jira/browse/FLINK-5908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-5908: --- Fix Version/s: (was: 1.4.0) (was: 1.2.2) > Blob Cache can (rarely) get corrupted on failed blob downloads > -- > > Key: FLINK-5908 > URL: https://issues.apache.org/jira/browse/FLINK-5908 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Stephan Ewen > > The Blob Cache downloads files directly to the target file location. > While it tries to clean up failed attempts, there is a change that this > cleanup does not complete. > In that case, we have a corrupt file at the target location. The blob cache > then assumes that it already has the file cached already and future requests > do not attempt to re-download the file. > The fix would be to download to a temp file name, validate the integrity, and > rename to the target file path when the validation succeeds. > The validation for "content addressable" could even include validating the > checksum hash. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6046) Add support for oversized messages during deployment
[ https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050450#comment-16050450 ] Nico Kruber commented on FLINK-6046: since the broken cleanup for the current blob store is a deal breaker for offloading oversized messages, we should re-vamp this issue after finishing FLIP-19 > Add support for oversized messages during deployment > > > Key: FLINK-6046 > URL: https://issues.apache.org/jira/browse/FLINK-6046 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Network >Reporter: Nico Kruber >Assignee: Nico Kruber > > This is the non-FLIP6 version of FLINK-4346, restricted to deployment > messages: > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than {{akka.framesize}} as may happen for task deployments via the > {{TaskDeploymentDescriptor}}. > We should use the {{BlobServer}} to offload big data items (if possible) and > make use of any potential distributed file system behind. This way, not only > do we avoid the akka framesize restriction, but may also be able to speed up > deployment. > I suggest the following changes: > - the sender, i.e. the {{Execution}} class, tries to store the serialized > job information and serialized task information (if oversized) from the > {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single > {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send > the whole tdd as usual via akka) > - if stored in a blob, these data items are removed from the tdd > - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any > offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it > re-assembles the original tdd > - the stored blob may be deleted after re-assembly of the tdd > Further (future) changes may include: > - separating the serialized job information and serialized task information > into two files and re-use the first one for all tasks > - not re-deploying these two during job recovery (if possible) > - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may > be removed when the job enters a final state instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6046) Add support for oversized messages during deployment
[ https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-6046: --- Component/s: Network > Add support for oversized messages during deployment > > > Key: FLINK-6046 > URL: https://issues.apache.org/jira/browse/FLINK-6046 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, Network >Reporter: Nico Kruber >Assignee: Nico Kruber > > This is the non-FLIP6 version of FLINK-4346, restricted to deployment > messages: > Currently, messages larger than the maximum Akka Framesize cause an error > when being transported. We should add a way to pass messages that are larger > than {{akka.framesize}} as may happen for task deployments via the > {{TaskDeploymentDescriptor}}. > We should use the {{BlobServer}} to offload big data items (if possible) and > make use of any potential distributed file system behind. This way, not only > do we avoid the akka framesize restriction, but may also be able to speed up > deployment. > I suggest the following changes: > - the sender, i.e. the {{Execution}} class, tries to store the serialized > job information and serialized task information (if oversized) from the > {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single > {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send > the whole tdd as usual via akka) > - if stored in a blob, these data items are removed from the tdd > - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any > offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it > re-assembles the original tdd > - the stored blob may be deleted after re-assembly of the tdd > Further (future) changes may include: > - separating the serialized job information and serialized task information > into two files and re-use the first one for all tasks > - not re-deploying these two during job recovery (if possible) > - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may > be removed when the job enters a final state instead -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122194058 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- I check the usages of `generateFieldAccess()` and the changes seem to be OK. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050423#comment-16050423 ] ASF GitHub Bot commented on FLINK-6896: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122194058 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- I check the usages of `generateFieldAccess()` and the changes seem to be OK. > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Content(long recor
[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-5991: --- Labels: api-breaking api-deprecation (was: ) > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-5991: --- Labels: (was: api-breaking api-deprecation) > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6682) Improve error message in case parallelism exceeds maxParallelism
[ https://issues.apache.org/jira/browse/FLINK-6682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050370#comment-16050370 ] ASF GitHub Bot commented on FLINK-6682: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4125 Hi @zentol . Please helps review if you are free, should I add some extra information ? Thanks. > Improve error message in case parallelism exceeds maxParallelism > > > Key: FLINK-6682 > URL: https://issues.apache.org/jira/browse/FLINK-6682 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: mingleizhang > > When restoring a job with a parallelism that exceeds the maxParallelism we're > not providing a useful error message, as all you get is an > IllegalArgumentException: > {code} > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed > at > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:343) > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) > ... 22 more > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.createKeyGroupPartitions(StateAssignmentOperation.java:449) > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignAttemptState(StateAssignmentOperation.java:117) > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:102) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1038) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1101) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4125: [FLINK-6682] [checkpoints] Improve error message in case ...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4125 Hi @zentol . Please helps review if you are free, should I add some extra information ? Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050358#comment-16050358 ] ASF GitHub Bot commented on FLINK-6896: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122180812 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- I think your branch looks good @wuchong. I tried a similar thing but did not change the `generateFieldAccess()` methods because I was afraid of side effects if we use the mapping for any kind types (instead of just POJOs). > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long sta
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122180812 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- I think your branch looks good @wuchong. I tried a similar thing but did not change the `generateFieldAccess()` methods because I was afraid of side effects if we use the mapping for any kind types (instead of just POJOs). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050341#comment-16050341 ] ASF GitHub Bot commented on FLINK-6896: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122178152 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Emm.. But it works good in my local. I pushed it to my branch, can you check it out? https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523 > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; >
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122178152 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Emm.. But it works good in my local. I pushed it to my branch, can you check it out? https://github.com/wuchong/flink/commit/ad97e84ca561ea32d2ce5e0779f4aff6429b5523 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050332#comment-16050332 ] ASF GitHub Bot commented on FLINK-6896: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122176847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- @wuchong I think the current PR. works well. So I suggest you can open a new PR. to improve it. What do you think? @fhueske @wuchong > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { >
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122176847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- @wuchong I think the current PR. works well. So I suggest you can open a new PR. to improve it. What do you think? @fhueske @wuchong --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050326#comment-16050326 ] ASF GitHub Bot commented on FLINK-6896: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122176275 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- @fhueske @wuchong thanks for try improve this PR. @wuchong For your solution, In my side also got two error when I run `TimeAttributesITCase`. Have you run all test in your local side ? > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private l
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122176275 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- @fhueske @wuchong thanks for try improve this PR. @wuchong For your solution, In my side also got two error when I run `TimeAttributesITCase`. Have you run all test in your local side ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122168658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- I think I tried that but it cause a couple of tests to fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050272#comment-16050272 ] ASF GitHub Bot commented on FLINK-6896: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122168658 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- I think I tried that but it cause a couple of tests to fail. > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Content(long recordTime, String urlKey) { > super(); > this.recordTime = recordTime; > this.urlKey = urlKey; > } > public String getUrlKey() { > return urlKey; > } > public void setUrlKey(String urlKey) { > this.urlKey = urlKey; > } > public long getRecordTime() { > return recordTime; > } > public void setRecordTime(long recordTime) { > this.recordTime = recordTime; > } > public long getHttpGetMessageCount() { > return httpGetMessageCount; > }
[jira] [Updated] (FLINK-6917) Introduce test base for end-to-end testing serializer config snapshotting, restoring, and compatibility check roundtrips
[ https://issues.apache.org/jira/browse/FLINK-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6917: --- Labels: ta (was: ) > Introduce test base for end-to-end testing serializer config snapshotting, > restoring, and compatibility check roundtrips > - > > Key: FLINK-6917 > URL: https://issues.apache.org/jira/browse/FLINK-6917 > Project: Flink > Issue Type: Test > Components: State Backends, Checkpointing, Tests, Type Serialization > System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Labels: ta > > Currently, we only have end-to-end tests of the serializer snapshotting and > restore roundtrip for the {{PojoSerializer}}, {{KryoSerializer}}, and Scala > type serializers. > They are all written differently with varying coverage of behavioural tests, > and scattered in several different test classes. > This JIRA tracks introducing a common test base for the serialization > roundtrip for all serializers in Flink, and also activating it for all > serializers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6896) Creating a table from a POJO and use table sink to output fail
[ https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050271#comment-16050271 ] ASF GitHub Bot commented on FLINK-6896: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122168412 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Right, I noticed that as well > Creating a table from a POJO and use table sink to output fail > -- > > Key: FLINK-6896 > URL: https://issues.apache.org/jira/browse/FLINK-6896 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Mark You >Assignee: sunjincheng > Attachments: debug.png > > > Following example fails at sink, using debug mode to see the reason of > ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row? > Sample: > {code:title=TumblingWindow.java|borderStyle=solid} > public class TumblingWindow { > public static void main(String[] args) throws Exception { > List data = new ArrayList(); > data.add(new Content(1L, "Hi")); > data.add(new Content(2L, "Hallo")); > data.add(new Content(3L, "Hello")); > data.add(new Content(4L, "Hello")); > data.add(new Content(7L, "Hello")); > data.add(new Content(8L, "Hello world")); > data.add(new Content(16L, "Hello world")); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream stream = env.fromCollection(data); > DataStream stream2 = stream.assignTimestampsAndWatermarks( > new > BoundedOutOfOrdernessTimestampExtractor(Time.milliseconds(1)) { > /** > * > */ > private static final long serialVersionUID = > 410512296011057717L; > @Override > public long extractTimestamp(Content element) { > return element.getRecordTime(); > } > }); > final StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > Table table = tableEnv.fromDataStream(stream2, > "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime"); > Table windowTable = > table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, > urlKey") > > .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum > "); > //table.printSchema(); > TableSink windowSink = new > CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1, > WriteMode.OVERWRITE); > windowTable.writeToSink(windowSink); > // tableEnv.toDataStream(windowTable, Row.class).print(); > env.execute(); > } > public static class Content implements Serializable { > /** > * > */ > private static final long serialVersionUID = 1429246948772430441L; > private String urlKey; > private long recordTime; > // private String recordTimeStr; > private long httpGetMessageCount; > private long httpPostMessageCount; > private long uplink; > private long downlink; > private long statusCode; > private long statusCodeCount; > public Content() { > super(); > } > public Content(long recordTime, String urlKey) { > super();
[GitHub] flink pull request #4111: [FLINK-6896][table] Fix generate PojoType input re...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4111#discussion_r122168412 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -871,12 +871,24 @@ class CodeGenerator( returnType: TypeInformation[_ <: Any], resultFieldNames: Seq[String]) : GeneratedExpression = { -val input1AccessExprs = for (i <- 0 until input1.getArity if input1Mapping.contains(i)) - yield generateInputAccess(input1, input1Term, i, input1Mapping) + +val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) { + for (i <- 0 until input1Mapping.length) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} else { + for (i <- 0 until input1.getArity if input1Mapping.contains(i)) +yield generateInputAccess(input1, input1Term, i, input1Mapping) +} val input2AccessExprs = input2 match { - case Some(ti) => for (i <- 0 until ti.getArity if input2Mapping.contains(i)) -yield generateInputAccess(ti, input2Term, i, input2Mapping) + case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) { --- End diff -- Right, I noticed that as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6917) Introduce test base for end-to-end testing serializer config snapshotting, restoring, and compatibility check roundtrips
[ https://issues.apache.org/jira/browse/FLINK-6917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6917: --- Labels: (was: ta) > Introduce test base for end-to-end testing serializer config snapshotting, > restoring, and compatibility check roundtrips > - > > Key: FLINK-6917 > URL: https://issues.apache.org/jira/browse/FLINK-6917 > Project: Flink > Issue Type: Test > Components: State Backends, Checkpointing, Tests, Type Serialization > System >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > > Currently, we only have end-to-end tests of the serializer snapshotting and > restore roundtrip for the {{PojoSerializer}}, {{KryoSerializer}}, and Scala > type serializers. > They are all written differently with varying coverage of behavioural tests, > and scattered in several different test classes. > This JIRA tracks introducing a common test base for the serialization > roundtrip for all serializers in Flink, and also activating it for all > serializers. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6929) Add documentation for Table API OVER windows
Fabian Hueske created FLINK-6929: Summary: Add documentation for Table API OVER windows Key: FLINK-6929 URL: https://issues.apache.org/jira/browse/FLINK-6929 Project: Flink Issue Type: Improvement Components: Documentation, Table API & SQL Affects Versions: 1.3.1, 1.4.0 Reporter: Fabian Hueske The Table API documentation is currently lacking a description of OVER windows. The page has a placeholder section with a TODO: {{./docs/dev/table/tableApi.md}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5354) Split up Table API documentation into multiple pages
[ https://issues.apache.org/jira/browse/FLINK-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-5354. Resolution: Done Fix Version/s: 1.4.0 1.3.1 Done for 1.3.1 with 398012aa165d5d6cee9982475ea9cded60ae6ae3 Done for 1.4.0 with a5d93a56cb37e691ec9bb06d17c76151e7619267 > Split up Table API documentation into multiple pages > - > > Key: FLINK-5354 > URL: https://issues.apache.org/jira/browse/FLINK-5354 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table API & SQL >Reporter: Timo Walther >Assignee: Fabian Hueske > Fix For: 1.3.1, 1.4.0 > > > The Table API documentation page is quite large at the moment. We should > split it up into multiple pages: > Here is my suggestion: > - Overview (Datatypes, Config, Registering Tables, Examples) > - TableSources and Sinks > - Table API > - SQL > - Functions -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6751) Table API / SQL Docs: UDFs Page
[ https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6751: - Description: Update and extend the documentation of UDFs in the Table API / SQL: {{./docs/dev/table/udfs.md}} Missing sections: - Registration of UDFs - UDAGGs was:Update and refine {{./docs/dev/table/udfs.md}} in feature branch https://github.com/apache/flink/tree/tableDocs > Table API / SQL Docs: UDFs Page > --- > > Key: FLINK-6751 > URL: https://issues.apache.org/jira/browse/FLINK-6751 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > > Update and extend the documentation of UDFs in the Table API / SQL: > {{./docs/dev/table/udfs.md}} > Missing sections: > - Registration of UDFs > - UDAGGs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6750: - Description: Update and refine the documentation about TableSources and TableSinks. There are a few TODOs left in {{./docs/dev/table/sourceSinks.md}} was:Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch https://github.com/apache/flink/tree/tableDocs > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.3.1, 1.4.0 > > > Update and refine the documentation about TableSources and TableSinks. > There are a few TODOs left in {{./docs/dev/table/sourceSinks.md}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6751) Table API / SQL Docs: UDFs Page
[ https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6751: - Issue Type: Task (was: Sub-task) Parent: (was: FLINK-5354) > Table API / SQL Docs: UDFs Page > --- > > Key: FLINK-6751 > URL: https://issues.apache.org/jira/browse/FLINK-6751 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Shaoxuan Wang > > Update and refine {{./docs/dev/table/udfs.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6750: - Issue Type: Task (was: Sub-task) Parent: (was: FLINK-5354) > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.3.1, 1.4.0 > > > Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6747) Table API / SQL Docs: Streaming Page
[ https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6747: - Description: Extend {{./docs/dev/table/streaming.md}} page. Missing are sections about - Dynamic Tables - QueryConfiguration (state retention time) was:Update and refine {{./docs/dev/table/streaming.md}} in feature branch https://github.com/apache/flink/tree/tableDocs > Table API / SQL Docs: Streaming Page > > > Key: FLINK-6747 > URL: https://issues.apache.org/jira/browse/FLINK-6747 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Extend {{./docs/dev/table/streaming.md}} page. > Missing are sections about > - Dynamic Tables > - QueryConfiguration (state retention time) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6747) Table API / SQL Docs: Streaming Page
[ https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6747: - Issue Type: Task (was: Sub-task) Parent: (was: FLINK-5354) > Table API / SQL Docs: Streaming Page > > > Key: FLINK-6747 > URL: https://issues.apache.org/jira/browse/FLINK-6747 > Project: Flink > Issue Type: Task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Update and refine {{./docs/dev/table/streaming.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-6747) Table API / SQL Docs: Streaming Page
[ https://issues.apache.org/jira/browse/FLINK-6747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-6747: Assignee: Fabian Hueske > Table API / SQL Docs: Streaming Page > > > Key: FLINK-6747 > URL: https://issues.apache.org/jira/browse/FLINK-6747 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Update and refine {{./docs/dev/table/streaming.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6750: - Comment: was deleted (was: Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54 Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5) > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.3.1, 1.4.0 > > > Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6748) Table API / SQL Docs: Table API Page
[ https://issues.apache.org/jira/browse/FLINK-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6748. Resolution: Done Fix Version/s: 1.4.0 1.3.1 Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54 Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5 > Table API / SQL Docs: Table API Page > > > Key: FLINK-6748 > URL: https://issues.apache.org/jira/browse/FLINK-6748 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.3.1, 1.4.0 > > > Update and refine {{./docs/dev/table/tableApi.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reopened FLINK-6750: -- > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.3.1, 1.4.0 > > > Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske resolved FLINK-6750. -- Resolution: Fixed Fix Version/s: 1.4.0 1.3.1 Done for 1.3.1 with 89850f21dc596b2b845ce98433263496e512ef54 Done for 1.4.0 with 232481572bb48e82880afdb2f7237af08a8404b5 > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > Fix For: 1.3.1, 1.4.0 > > > Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6748) Table API / SQL Docs: Table API Page
[ https://issues.apache.org/jira/browse/FLINK-6748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050243#comment-16050243 ] ASF GitHub Bot commented on FLINK-6748: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4093 > Table API / SQL Docs: Table API Page > > > Key: FLINK-6748 > URL: https://issues.apache.org/jira/browse/FLINK-6748 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > Update and refine {{./docs/dev/table/tableApi.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
issues@flink.apache.org
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4094 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6750) Table API / SQL Docs: Table Sources & Sinks Page
[ https://issues.apache.org/jira/browse/FLINK-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050244#comment-16050244 ] ASF GitHub Bot commented on FLINK-6750: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4094 > Table API / SQL Docs: Table Sources & Sinks Page > > > Key: FLINK-6750 > URL: https://issues.apache.org/jira/browse/FLINK-6750 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Timo Walther > > Update and refine {{./docs/dev/table/sourceSinks.md}} in feature branch > https://github.com/apache/flink/tree/tableDocs -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4093: [FLINK-6748] [table] [docs] Reworked Table API Pag...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4093 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---