[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263908#comment-16263908 ] Fabian Hueske commented on FLINK-6101: -- In the example you gave, I'd think that {{'adId}} in {{select()}} refers to the named expression in the {{groupBy()}} call, i.e., {{'adId + 1}} because the {{as}} overrides the previous definition of {{'adId}}. If a user would want the original meaning of {{'adId}}, the user should not reuse {{'adId}} in the {{as}} expression. I think this is a corner case, but I can see how it might cause confusion. So I'm fine with keeping {{as}} out of {{groupBy()}} for now. > GroupBy fields with arithmetic expression (include UDF) can not be selected > --- > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > (BTW, this syntax is invalid in RDBMS which will indicate the selected column > is invalid in the select list because it is not contained in either an > aggregate function or the GROUP BY clause in SQL Server.) > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will also cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. > and apply an UDF doesn’t work either > {code} >table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, > 'd.count, 'e.avg) > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, > TMP_1, TMP_2]. > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > One way to solve this is to add support alias in groupBy clause ( it seems a > bit odd against SQL though TableAPI has a different groupBy grammar), > and I prefer to support select original expressions and UDF in groupBy > clause(make consistent with SQL). > as thus: > {code} > // use expression > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count) > // use UDF > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, Mod('b,3)) > .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count) > {code} > After had a look into the code, found there was a problem in the groupBy > implementation, validation hadn't considered the expressions in groupBy > clause. it should be noted that a table has been actually changed after > groupBy operation ( a new Table) and the groupBy keys replace the original > field reference in essence. > > What do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-2170. Resolution: Implemented Fix Version/s: 1.5.0 1.4.0 Implemented for 1.4.0 with 35517f1291293f73f4466a4fdbed4296b2dd80a5 Implemented for 1.4.0 with 200612ee0eaa42fdba141be138de172f86798f54 > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > Fix For: 1.4.0, 1.5.0 > > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263755#comment-16263755 ] lincoln.lee commented on FLINK-6101: [~fhueske] Agree with you that SQL is not perfect. But in this case I think it's reasonable that SQL do not support alias in groupBy clause. Think about such a query similar to which Timo mentioned above: {code} .window(Tumble over 1.minute on 'showTime as 'w) .groupBy('adId + 1 as 'adId, 'w) .select('adId + 1, 'w.rowtime as 'wtime, 'clickEarn.sum as 'revenue) {code} What does `'adId + 1` represent in select clause ? Add alias support in groupBy is handy but may encounter confusing cases. What do you think? BTW, we're having another discussion about the limitation of column expression in select clause after a groupBy (this issue had been partly discussed with Julian, see https://issues.apache.org/jira/browse/CALCITE-1710), we found that MySQL and SQL Server have different implementations, extend the select expressions maybe more meaningful. I'll sync here once we have an preliminary conclusion. > GroupBy fields with arithmetic expression (include UDF) can not be selected > --- > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > (BTW, this syntax is invalid in RDBMS which will indicate the selected column > is invalid in the select list because it is not contained in either an > aggregate function or the GROUP BY clause in SQL Server.) > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will also cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. > and apply an UDF doesn’t work either > {code} >table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, > 'd.count, 'e.avg) > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, > TMP_1, TMP_2]. > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > One way to solve this is to add support alias in groupBy clause ( it seems a > bit odd against SQL though TableAPI has a different groupBy grammar), > and I prefer to support select original expressions and UDF in groupBy > clause(make consistent with SQL). > as thus: > {code} > // use expression > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count) > // use UDF > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, Mod('b,3)) > .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count) > {code} > After had a look into the code, found there was a problem in the groupBy > implementation, validation hadn't considered the expressions in groupBy > clause. it should be noted that a table has been actually changed after > groupBy operation ( a new Table) and the groupBy keys replace the original > field reference in essence. > > What do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7967) Deprecate Hadoop specific Flink configuration options
[ https://issues.apache.org/jira/browse/FLINK-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263669#comment-16263669 ] ASF GitHub Bot commented on FLINK-7967: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4946 Thanks, @greghogan . I will make a first refinement based on your suggestions. > Deprecate Hadoop specific Flink configuration options > - > > Key: FLINK-7967 > URL: https://issues.apache.org/jira/browse/FLINK-7967 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Till Rohrmann >Priority: Trivial > > I think we should deprecate the hadoop specific configuration options from > Flink and encourage people to use instead the environment variable > {{HADOOP_CONF_DIR}} to configure the Hadoop configuration directory. This > includes: > {code} > fs.hdfs.hdfsdefault > fs.hdfs.hdfssite > fs.hdfs.hadoopconf > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4946: [FLINK-7967] [config] Deprecate Hadoop specific Flink con...
Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/4946 Thanks, @greghogan . I will make a first refinement based on your suggestions. ---
[jira] [Commented] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263600#comment-16263600 ] ASF GitHub Bot commented on FLINK-8104: --- Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5040 Thanks for the prompt review @twalthr . Yeah I think generalizing the codegen part would be great. Also do you think I can put in TableAPI support in this PR as well? I will update the documents and also add more ITCase :-) > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5040: [FLINK-8104][Table API] fixing ROW type value constructor...
Github user walterddr commented on the issue: https://github.com/apache/flink/pull/5040 Thanks for the prompt review @twalthr . Yeah I think generalizing the codegen part would be great. Also do you think I can put in TableAPI support in this PR as well? I will update the documents and also add more ITCase :-) ---
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263443#comment-16263443 ] ASF GitHub Bot commented on FLINK-2170: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5043 > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263442#comment-16263442 ] ASF GitHub Bot commented on FLINK-2170: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4670 > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4670: [FLINK-2170] [connectors] Add ORC connector for Ta...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4670 ---
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5043 ---
[jira] [Commented] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263215#comment-16263215 ] ASF GitHub Bot commented on FLINK-8126: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5044 Setting the version in IntelliJ to 8.4 seems to work. Settings -> Other Settings -> Checkstyle -> Checkstyle version drop-down menu > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5044 Setting the version in IntelliJ to 8.4 seems to work. Settings -> Other Settings -> Checkstyle -> Checkstyle version drop-down menu ---
[jira] [Commented] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263209#comment-16263209 ] ASF GitHub Bot commented on FLINK-8126: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5044 Which checkstyle version is configured in IntelliJ @StephanEwen ? Could be that we have to upgrade it there as well. IIRC we set recommend setting it to 6.X in the IDE setup guide because of some parsing errors in 8.X, which now may be resolved. > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5044 Which checkstyle version is configured in IntelliJ @StephanEwen ? Could be that we have to upgrade it there as well. IIRC we set recommend setting it to 6.X in the IDE setup guide because of some parsing errors in 8.X, which now may be resolved. ---
[jira] [Commented] (FLINK-7718) Port JobVertixMetricsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263050#comment-16263050 ] ASF GitHub Bot commented on FLINK-7718: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5055 [FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint ## Brief change log - *Migrate logic in `org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler` to new handler and add new handler to DispatcherRestEndpoint.* - *Add common classes for remaining implementations of `org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler`, which require migration as well* ## Verifying this change This change added tests and can be verified as follows: - *Added tests for all new classes and old classes except for `DispatcherRestEndpoint`* - *Manually deployed a job locally and verified with `curl` that JobVertexMetrics can be queried in FLIP-6.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7718 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5055.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5055 commit b888c876131fa3744baff7393bb75025ccb4b61f Author: gyaoDate: 2017-11-22T17:58:03Z [FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to new handler and add new handler to DispatcherRestEndpoint. Add common classes for remaining implementations of org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler, which require migration as well. > Port JobVertixMetricsHandler to new REST endpoint > - > > Key: FLINK-7718 > URL: https://issues.apache.org/jira/browse/FLINK-7718 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JobVertexMetricsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5055: [FLINK-7718] [flip6] Add JobVertexMetricsHandler t...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5055 [FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint ## Brief change log - *Migrate logic in `org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler` to new handler and add new handler to DispatcherRestEndpoint.* - *Add common classes for remaining implementations of `org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler`, which require migration as well* ## Verifying this change This change added tests and can be verified as follows: - *Added tests for all new classes and old classes except for `DispatcherRestEndpoint`* - *Manually deployed a job locally and verified with `curl` that JobVertexMetrics can be queried in FLIP-6.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7718 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5055.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5055 commit b888c876131fa3744baff7393bb75025ccb4b61f Author: gyaoDate: 2017-11-22T17:58:03Z [FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint Migrate logic in org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to new handler and add new handler to DispatcherRestEndpoint. Add common classes for remaining implementations of org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler, which require migration as well. ---
[jira] [Commented] (FLINK-8126) Update and fix checkstyle
[ https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262995#comment-16262995 ] ASF GitHub Bot commented on FLINK-8126: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5044 I am not seeing some problem in IntelliJ with the checkstyle plugin (seems to not be able to parse the checkstyle config). Anyone else seeing this issue? I am on an older IntelliJ version, so might be that (reluctant to update because switching Scala versions works less well in newer IntelliJ versions). > Update and fix checkstyle > - > > Key: FLINK-8126 > URL: https://issues.apache.org/jira/browse/FLINK-8126 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.5.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Trivial > Fix For: 1.5.0 > > > Our current checkstyle configuration (checkstyle version 6.19) is missing > some ImportOrder and variable naming errors which are detected in 1) IntelliJ > using the same checkstyle version and 2) with the maven-checkstyle-plugin > with an up-to-date checkstyle version (8.4). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5044 I am not seeing some problem in IntelliJ with the checkstyle plugin (seems to not be able to parse the checkstyle config). Anyone else seeing this issue? I am on an older IntelliJ version, so might be that (reluctant to update because switching Scala versions works less well in newer IntelliJ versions). ---
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262976#comment-16262976 ] ASF GitHub Bot commented on FLINK-2170: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5043 Thanks for the review @twalthr! I'll address your comments and will merge this PR. > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152634076 --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link OrcTableSource}. + */ +public class OrcTableSourceITCase extends MultipleProgramsTestBase { --- End diff -- `TableProgramsTestBase` doesn't really give any advantage over `MultipleProgramsTestBase` since I'm testing the table source and not the null-behavior of the query execution. ---
[GitHub] flink issue #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTa...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5043 Thanks for the review @twalthr! I'll address your comments and will merge this PR. ---
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262973#comment-16262973 ] ASF GitHub Bot commented on FLINK-2170: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152634076 --- Diff: flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java --- @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link OrcTableSource}. + */ +public class OrcTableSourceITCase extends MultipleProgramsTestBase { --- End diff -- `TableProgramsTestBase` doesn't really give any advantage over `MultipleProgramsTestBase` since I'm testing the table source and not the null-behavior of the query execution. > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8136. - Resolution: Fixed Fix Version/s: 1.5.0 1.4.0 Fixed in 1.5: 5017c679c06de92bec6b094268d61ce5f3109b9e Fixed in 1.4: 28157962196cecb94a59720e92cbf3682418e821 > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.4.0, 1.5.0 > > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262943#comment-16262943 ] ASF GitHub Bot commented on FLINK-8136: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5054 > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5054: [FLINK-8136] [table] Fix code generation with Joda...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5054 ---
[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner
[ https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262932#comment-16262932 ] Aljoscha Krettek commented on FLINK-7913: - What is the default Kafka partitioner doing different from the default Flink Kafka partitioner? > Add support for Kafka default partitioner > - > > Key: FLINK-7913 > URL: https://issues.apache.org/jira/browse/FLINK-7913 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Konstantin Lalafaryan >Assignee: Konstantin Lalafaryan > Fix For: 1.5.0 > > > Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* > and just one implementation *FlinkFixedPartitioner*. > In order to be able to use Kafka's default partitioner you have to create new > implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. > It will be really good to be able to define the partitioner without > implementing the new class. > Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262930#comment-16262930 ] ASF GitHub Bot commented on FLINK-8136: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5054 Merging... > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5054: [FLINK-8136] [table] Fix code generation with JodaTime sh...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5054 Merging... ---
[jira] [Reopened] (FLINK-8050) RestServer#shutdown() ignores exceptions thrown when shutting down netty.
[ https://issues.apache.org/jira/browse/FLINK-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reopened FLINK-8050: --- > RestServer#shutdown() ignores exceptions thrown when shutting down netty. > - > > Key: FLINK-8050 > URL: https://issues.apache.org/jira/browse/FLINK-8050 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission
[ https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8137: --- Description: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's input formats perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? was: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? > Flink JobManager API non-responsive during job submission > - > > Key: FLINK-8137 > URL: https://issues.apache.org/jira/browse/FLINK-8137 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission, JobManager, REST, Webfrontend >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 running a batch job in Kubernetes. >Reporter: Joshua Griffith >Priority: Minor > > When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, > the JobManager's REST API appears to hang until the job is submitted. The > submission time may be large enough to cause timeouts if the > {{getStatistics}} and {{createInputSplits}} methods of a job's input formats > perform time-intensive tasks like running external queries. This is > exacerbated when a job contains many such input formats since they appear to > be initialized sequentially. For a particular job with over 100 inputs, it's > typical for the API (and consequently the web UI) to be non-responsive for > 45–60 seconds. > Would it make sense for tasks to have a {{Configuring}} state before the > {{Created}} state to provide greater visibility and indicate that the > JobManager is still healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission
[ https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8137: --- Description: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? was: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? > Flink JobManager API non-responsive during job submission > - > > Key: FLINK-8137 > URL: https://issues.apache.org/jira/browse/FLINK-8137 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission, JobManager, REST, Webfrontend >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 running a batch job in Kubernetes. >Reporter: Joshua Griffith >Priority: Minor > > When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, > the JobManager's REST API appears to hang until the job is submitted. The > submission time may be large enough to cause timeouts if the > {{getStatistics}} and {{createInputSplits}} methods of a job's > {{RichInputFormat}} s perform time-intensive tasks like running external > queries. This is exacerbated when a job contains many such input formats > since they appear to be initialized sequentially. For a particular job with > over 100 inputs, it's typical for the API (and consequently the web UI) to be > non-responsive for 45–60 seconds. > Would it make sense for tasks to have a {{Configuring}} state before the > {{Created}} state to provide greater visibility and indicate that the > JobManager is still healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission
[ https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8137: --- Description: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? was: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many {{RichInputFormat}}s, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? > Flink JobManager API non-responsive during job submission > - > > Key: FLINK-8137 > URL: https://issues.apache.org/jira/browse/FLINK-8137 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission, JobManager, REST, Webfrontend >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 running a batch job in Kubernetes. >Reporter: Joshua Griffith >Priority: Minor > > When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, > the JobManager's REST API appears to hang until the job is submitted. The > submission time may be large enough to cause timeouts if the > {{getStatistics}} and {{createInputSplits}} methods of a job's > {{RichInputFormat}}s perform time-intensive tasks like running external > queries. This is exacerbated when a job contains many such input formats, > since they appear to be initialized sequentially. For a particular job with > over 100 inputs, it's typical for the API (and consequently the web UI) to be > non-responsive for 45–60 seconds. > Would it make sense for tasks to have a {{Configuring}} state before the > {{Created}} state to provide greater visibility and indicate that the > JobManager is still healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission
[ https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8137: --- Description: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? was: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many such input formats, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? > Flink JobManager API non-responsive during job submission > - > > Key: FLINK-8137 > URL: https://issues.apache.org/jira/browse/FLINK-8137 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission, JobManager, REST, Webfrontend >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 running a batch job in Kubernetes. >Reporter: Joshua Griffith >Priority: Minor > > When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, > the JobManager's REST API appears to hang until the job is submitted. The > submission time may be large enough to cause timeouts if the > {{getStatistics}} and {{createInputSplits}} methods of a job's > {{RichInputFormat}} s perform time-intensive tasks like running external > queries. This is exacerbated when a job contains many such input formats, > since they appear to be initialized sequentially. For a particular job with > over 100 inputs, it's typical for the API (and consequently the web UI) to be > non-responsive for 45–60 seconds. > Would it make sense for tasks to have a {{Configuring}} state before the > {{Created}} state to provide greater visibility and indicate that the > JobManager is still healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission
[ https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8137: --- Description: When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the {{getStatistics}} and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform time-intensive tasks like running external queries. This is exacerbated when a job contains many {{RichInputFormat}}s, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a {{Configuring}} state before the {{Created}} state to provide greater visibility and indicate that the JobManager is still healthy? was: When submitting a new FlinkPlan using the StandaloneClusterClient, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the getStatistics and createInputSplits methods of a job's RichInputFormats perform time-intensive tasks like running external queries. This is exacerbated when a job contains many RichInputFormats, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a Configuring state before the Created state to provide greater visibility and indicate that the JobManager is still healthy? > Flink JobManager API non-responsive during job submission > - > > Key: FLINK-8137 > URL: https://issues.apache.org/jira/browse/FLINK-8137 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission, JobManager, REST, Webfrontend >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 running a batch job in Kubernetes. >Reporter: Joshua Griffith >Priority: Minor > > When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, > the JobManager's REST API appears to hang until the job is submitted. The > submission time may be large enough to cause timeouts if the > {{getStatistics}} and {{createInputSplits}} methods of a job's > {{RichInputFormat}}s perform time-intensive tasks like running external > queries. This is exacerbated when a job contains many {{RichInputFormat}}s, > since they appear to be initialized sequentially. For a particular job with > over 100 inputs, it's typical for the API (and consequently the web UI) to be > non-responsive for 45–60 seconds. > Would it make sense for tasks to have a {{Configuring}} state before the > {{Created}} state to provide greater visibility and indicate that the > JobManager is still healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission
[ https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joshua Griffith updated FLINK-8137: --- Environment: Flink 1.3.2 running a batch job in Kubernetes. > Flink JobManager API non-responsive during job submission > - > > Key: FLINK-8137 > URL: https://issues.apache.org/jira/browse/FLINK-8137 > Project: Flink > Issue Type: Bug > Components: Client, Job-Submission, JobManager, REST, Webfrontend >Affects Versions: 1.3.2 > Environment: Flink 1.3.2 running a batch job in Kubernetes. >Reporter: Joshua Griffith >Priority: Minor > > When submitting a new FlinkPlan using the StandaloneClusterClient, the > JobManager's REST API appears to hang until the job is submitted. The > submission time may be large enough to cause timeouts if the getStatistics > and createInputSplits methods of a job's RichInputFormats perform > time-intensive tasks like running external queries. This is exacerbated when > a job contains many RichInputFormats, since they appear to be initialized > sequentially. For a particular job with over 100 inputs, it's typical for the > API (and consequently the web UI) to be non-responsive for 45–60 seconds. > Would it make sense for tasks to have a Configuring state before the Created > state to provide greater visibility and indicate that the JobManager is still > healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8137) Flink JobManager API non-responsive during job submission
Joshua Griffith created FLINK-8137: -- Summary: Flink JobManager API non-responsive during job submission Key: FLINK-8137 URL: https://issues.apache.org/jira/browse/FLINK-8137 Project: Flink Issue Type: Bug Components: Client, Job-Submission, JobManager, REST, Webfrontend Affects Versions: 1.3.2 Reporter: Joshua Griffith Priority: Minor When submitting a new FlinkPlan using the StandaloneClusterClient, the JobManager's REST API appears to hang until the job is submitted. The submission time may be large enough to cause timeouts if the getStatistics and createInputSplits methods of a job's RichInputFormats perform time-intensive tasks like running external queries. This is exacerbated when a job contains many RichInputFormats, since they appear to be initialized sequentially. For a particular job with over 100 inputs, it's typical for the API (and consequently the web UI) to be non-responsive for 45–60 seconds. Would it make sense for tasks to have a Configuring state before the Created state to provide greater visibility and indicate that the JobManager is still healthy? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262911#comment-16262911 ] ASF GitHub Bot commented on FLINK-8136: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5054 +1 > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5054: [FLINK-8136] [table] Fix code generation with JodaTime sh...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5054 +1 ---
[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262908#comment-16262908 ] ASF GitHub Bot commented on FLINK-8136: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5054 [FLINK-8136] [table] Fix code generation with JodaTime shading ## What is the purpose of the change Fix `DATE_FORMAT` with shaded JodaTime. ## Brief change log Do not use hard-coded class name in code generation. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8136 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5054.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5054 commit f50ae50c8e20ac025088c02aac56452f7013d77b Author: twalthrDate: 2017-11-22T16:43:08Z [FLINK-8136] [table] Fix code generation with JodaTime shading > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5054: [FLINK-8136] [table] Fix code generation with Joda...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5054 [FLINK-8136] [table] Fix code generation with JodaTime shading ## What is the purpose of the change Fix `DATE_FORMAT` with shaded JodaTime. ## Brief change log Do not use hard-coded class name in code generation. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8136 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5054.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5054 commit f50ae50c8e20ac025088c02aac56452f7013d77b Author: twalthrDate: 2017-11-22T16:43:08Z [FLINK-8136] [table] Fix code generation with JodaTime shading ---
[jira] [Commented] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery
[ https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262893#comment-16262893 ] ASF GitHub Bot commented on FLINK-8132: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5053#discussion_r152618042 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +/** + * Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}. + */ +public class FlinkKafka011Exception extends Exception { --- End diff -- Should extend `FlinkException` > FlinkKafkaProducer011 can commit incorrect transaction during recovery > -- > > Key: FLINK-8132 > URL: https://issues.apache.org/jira/browse/FLINK-8132 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > Faulty scenario with producer pool of 2. > 1. started transaction 1 with producerA, written record 42 > 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, > written record 43 > 3. checkpoint 1 completed, committing txn1, returning producerA to the pool > 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, > written record 44 > 5. crash > 6. recover to checkpoint 1, txn1 from producerA found to > "pendingCommitTransactions", attempting to recoverAndCommit(txn1) > 7. unfortunately txn1 and txn3 from the same producers are identical from > KafkaBroker perspective and thus txn3 is being committed > result is that both records 42 and 44 are committed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5053: [FLINK-8132][kafka] Re-initialize transactional Ka...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5053#discussion_r152618042 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +/** + * Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}. + */ +public class FlinkKafka011Exception extends Exception { --- End diff -- Should extend `FlinkException` ---
[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262885#comment-16262885 ] Timo Walther commented on FLINK-8136: - I will look into the issue. > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther reassigned FLINK-8136: --- Assignee: Timo Walther > Cast exception error on Flink SQL when using DATE_FORMAT > > > Key: FLINK-8136 > URL: https://issues.apache.org/jira/browse/FLINK-8136 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.4.0 > Environment: Any environment >Reporter: David Marcos >Assignee: Timo Walther >Priority: Blocker > > Due to the shading of joda time there is a exception when CodeGenerator try > to cast org.joda.time.format.DateTimeFormatter to > org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > This can be reproduce by using DATE_FORMAT temporal function in any flink SQL > Affected scala file: > > https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala > Affected method: > -- > addReusableDateFormatter > Affected code: > --- > val field = > s""" > |final org.joda.time.format.DateTimeFormatter $fieldTerm; > |""".stripMargin > Fastest solution: > --- > val field = > s""" > |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter > $fieldTerm; > |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT
David Marcos created FLINK-8136: --- Summary: Cast exception error on Flink SQL when using DATE_FORMAT Key: FLINK-8136 URL: https://issues.apache.org/jira/browse/FLINK-8136 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.4.0 Environment: Any environment Reporter: David Marcos Priority: Blocker Due to the shading of joda time there is a exception when CodeGenerator try to cast org.joda.time.format.DateTimeFormatter to org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter This can be reproduce by using DATE_FORMAT temporal function in any flink SQL Affected scala file: https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala Affected method: -- addReusableDateFormatter Affected code: --- val field = s""" |final org.joda.time.format.DateTimeFormatter $fieldTerm; |""".stripMargin Fastest solution: --- val field = s""" |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter $fieldTerm; |""".stripMargin -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery
[ https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262878#comment-16262878 ] ASF GitHub Bot commented on FLINK-8132: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5053 [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint Previously faulty scenario with producer pool of 2. 1. started transaction 1 with producerA, written record 42 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43 3. checkpoint 1 completed, committing txn1, returning producerA to the pool 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44 5. crash 6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1) 7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed result is that both records 42 and 44 are committed. With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1. ## Verifying this change This PR adds a test that was previously failing to cover for future regressions. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8132 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5053.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5053 commit 6f0f56ca25a12da513253d738d14d8516a808bd8 Author: Piotr NowojskiDate: 2017-11-22T10:37:48Z [hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception commit b79a372d589df54f505857ed78e120d69d0ad50a Author: Piotr Nowojski Date: 2017-11-22T14:53:08Z [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint Previously faulty scenario with producer pool of 2. 1. started transaction 1 with producerA, written record 42 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43 3. checkpoint 1 completed, committing txn1, returning producerA to the pool 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44 5. crash 6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1) 7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed result is that both records 42 and 44 are committed. With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1. commit a5938795c5ae0dd7022d07ee90620fb6a8ce14a9 Author: Piotr Nowojski Date: 2017-11-22T14:55:20Z [hotfix][kafka] Remove unused method in kafka tests > FlinkKafkaProducer011 can commit incorrect transaction during recovery > -- > > Key: FLINK-8132 > URL: https://issues.apache.org/jira/browse/FLINK-8132 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > Faulty scenario with producer pool of 2. > 1. started transaction 1 with producerA, written record 42 > 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, > written record 43 > 3. checkpoint 1 completed, committing txn1, returning producerA to the pool > 4. checkpoint 2 triggered , committing txn2, started
[GitHub] flink pull request #5053: [FLINK-8132][kafka] Re-initialize transactional Ka...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5053 [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint Previously faulty scenario with producer pool of 2. 1. started transaction 1 with producerA, written record 42 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43 3. checkpoint 1 completed, committing txn1, returning producerA to the pool 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44 5. crash 6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1) 7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed result is that both records 42 and 44 are committed. With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1. ## Verifying this change This PR adds a test that was previously failing to cover for future regressions. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8132 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5053.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5053 commit 6f0f56ca25a12da513253d738d14d8516a808bd8 Author: Piotr NowojskiDate: 2017-11-22T10:37:48Z [hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception commit b79a372d589df54f505857ed78e120d69d0ad50a Author: Piotr Nowojski Date: 2017-11-22T14:53:08Z [FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each checkpoint Previously faulty scenario with producer pool of 2. 1. started transaction 1 with producerA, written record 42 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43 3. checkpoint 1 completed, committing txn1, returning producerA to the pool 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44 5. crash 6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1) 7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed result is that both records 42 and 44 are committed. With this fix, after re-initialization txn3 will have different producerId/epoch counters compared to txn1. commit a5938795c5ae0dd7022d07ee90620fb6a8ce14a9 Author: Piotr Nowojski Date: 2017-11-22T14:55:20Z [hotfix][kafka] Remove unused method in kafka tests ---
[GitHub] flink pull request #5040: [FLINK-8104][Table API] fixing ROW type value cons...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5040#discussion_r152588446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -835,6 +836,43 @@ object ScalarOperators { generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand) } + def generateRow( + codeGenerator: CodeGenerator, + resultType: TypeInformation[_], + elements: Seq[GeneratedExpression]) + : GeneratedExpression = { +val rowTerm = codeGenerator.addReusableRow(resultType.getArity, elements.size) + +val boxedElements: Seq[GeneratedExpression] = resultType match { + case ct: CompositeType[_] => +elements.zipWithIndex.map{ + case (e, idx) => +val boxedTypeTerm = boxedTypeTermForTypeInfo(ct.getTypeAt(idx)) --- End diff -- Can we put this into a private helper method? I think it is the same code for `generateArray`, `generateMap`, and `generateRow`. ---
[jira] [Commented] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262870#comment-16262870 ] ASF GitHub Bot commented on FLINK-8104: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5040#discussion_r152588168 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -835,6 +836,43 @@ object ScalarOperators { generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand) } + def generateRow( + codeGenerator: CodeGenerator, + resultType: TypeInformation[_], + elements: Seq[GeneratedExpression]) + : GeneratedExpression = { +val rowTerm = codeGenerator.addReusableRow(resultType.getArity, elements.size) + +val boxedElements: Seq[GeneratedExpression] = resultType match { + case ct: CompositeType[_] => --- End diff -- This should always be `RowTypeInfo`. I think we can simply cast it. > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5040: [FLINK-8104][Table API] fixing ROW type value cons...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5040#discussion_r152614003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala --- @@ -417,6 +417,10 @@ object FlinkTypeFactory { val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType] compositeRelDataType.compositeType +case ROW if relDataType.isInstanceOf[RelRecordType] => + val relRecordType = relDataType.asInstanceOf[RelRecordType] + new RowSchema(relRecordType).typeInfo + // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder case ROW | CURSOR => new NothingTypeInfo --- End diff -- Maybe we can remove the `ROW` here if it has no impact on the existing tests. ---
[jira] [Commented] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262872#comment-16262872 ] ASF GitHub Bot commented on FLINK-8104: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5040#discussion_r152588446 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -835,6 +836,43 @@ object ScalarOperators { generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand) } + def generateRow( + codeGenerator: CodeGenerator, + resultType: TypeInformation[_], + elements: Seq[GeneratedExpression]) + : GeneratedExpression = { +val rowTerm = codeGenerator.addReusableRow(resultType.getArity, elements.size) + +val boxedElements: Seq[GeneratedExpression] = resultType match { + case ct: CompositeType[_] => +elements.zipWithIndex.map{ + case (e, idx) => +val boxedTypeTerm = boxedTypeTermForTypeInfo(ct.getTypeAt(idx)) --- End diff -- Can we put this into a private helper method? I think it is the same code for `generateArray`, `generateMap`, and `generateRow`. > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5040: [FLINK-8104][Table API] fixing ROW type value cons...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5040#discussion_r152588168 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -835,6 +836,43 @@ object ScalarOperators { generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand) } + def generateRow( + codeGenerator: CodeGenerator, + resultType: TypeInformation[_], + elements: Seq[GeneratedExpression]) + : GeneratedExpression = { +val rowTerm = codeGenerator.addReusableRow(resultType.getArity, elements.size) + +val boxedElements: Seq[GeneratedExpression] = resultType match { + case ct: CompositeType[_] => --- End diff -- This should always be `RowTypeInfo`. I think we can simply cast it. ---
[jira] [Commented] (FLINK-8104) Fix Row value constructor
[ https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262871#comment-16262871 ] ASF GitHub Bot commented on FLINK-8104: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5040#discussion_r152614003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala --- @@ -417,6 +417,10 @@ object FlinkTypeFactory { val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType] compositeRelDataType.compositeType +case ROW if relDataType.isInstanceOf[RelRecordType] => + val relRecordType = relDataType.asInstanceOf[RelRecordType] + new RowSchema(relRecordType).typeInfo + // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder case ROW | CURSOR => new NothingTypeInfo --- End diff -- Maybe we can remove the `ROW` here if it has no impact on the existing tests. > Fix Row value constructor > - > > Key: FLINK-8104 > URL: https://issues.apache.org/jira/browse/FLINK-8104 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > Support Row value constructor which is currently broken. > See > {code:java} > // > flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala > @Test > def testValueConstructorFunctions(): Unit = { > // TODO we need a special code path that flattens ROW types > // testSqlApi("ROW('hello world', 12)", "hello world") // test base only > returns field 0 > // testSqlApi("('hello world', 12)", "hello world") // test base only > returns field 0 > // ... > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5045#discussion_r152608576 --- Diff: docs/concepts/programming-model.md --- @@ -33,53 +33,52 @@ Flink offers different levels of abstraction to develop streaming/batch applicat - - The lowest level abstraction simply offers **stateful streaming**. It is embedded into the [DataStream API](../dev/datastream_api.html) -via the [Process Function](../dev/stream/operators/process_function.html). It allows users freely process events from one or more streams, -and use consistent fault tolerant *state*. In addition, users can register event time and processing time callbacks, + - The lowest level abstraction offers **stateful streaming** and is embedded into the [DataStream API](../dev/datastream_api.html) +via the [Process Function](../dev/stream/operators/process_function.html). It allows users to process events from one or more streams, +and use consistent fault tolerant *state*. Users can register event time and processing time callbacks, allowing programs to realize sophisticated computations. - - In practice, most applications would not need the above described low level abstraction, but would instead program against the + - In practice, most applications would not need the low level abstraction describe above, but would instead program against the **Core APIs** like the [DataStream API](../dev/datastream_api.html) (bounded/unbounded streams) and the [DataSet API](../dev/batch/index.html) -(bounded data sets). These fluent APIs offer the common building blocks for data processing, like various forms of user-specified +(bounded data sets). These fluent APIs offer the common building blocks for data processing, like forms of user-specified transformations, joins, aggregations, windows, state, etc. Data types processed in these APIs are represented as classes -in the respective programming languages. +in respective programming languages. -The low level *Process Function* integrates with the *DataStream API*, making it possible to go the lower level abstraction -for certain operations only. The *DataSet API* offers additional primitives on bounded data sets, like loops/iterations. +The low level *Process Function* integrates with the *DataStream API*, making it possible to use the lower level abstraction +for certain operations. The *DataSet API* offers additional primitives on bounded data sets, like loops or iterations. - The **Table API** is a declarative DSL centered around *tables*, which may be dynamically changing tables (when representing streams). -The [Table API](../dev/table_api.html) follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases) +The [Table API](../dev/table_api.html) follows the (extended) relational model. Tables have a schema attached (similar to tables in relational databases) and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc. -Table API programs declaratively define *what logical operation should be done* rather than specifying exactly - *how the code for the operation looks*. Though the Table API is extensible by various types of user-defined +Table API programs declaratively define *what logical operation should to perform* rather than specifying + *how the code for the operation looks*. The Table API is extensible by various types of user-defined functions, it is less expressive than the *Core APIs*, but more concise to use (less code to write). -In addition, Table API programs also go through an optimizer that applies optimization rules before execution. +Table API programs also go through an optimizer that applies optimization rules before execution. -One can seamlessly convert between tables and *DataStream*/*DataSet*, allowing programs to mix *Table API* and with the *DataStream* +You can seamlessly convert between tables and *DataStream*/*DataSet*, allowing programs to mix *Table API* and with the *DataStream* and *DataSet* APIs. - The highest level abstraction offered by Flink is **SQL**. This abstraction is similar to the *Table API* both in semantics and expressiveness, but represents programs as SQL query expressions. -The [SQL](../dev/table_api.html#sql) abstraction closely interacts with the Table API, and SQL queries can be executed over tables defined in the *Table API*. +The [SQL](../dev/table_api.html#sql) abstraction closely interacts with the Table API, and you can execute SQL queries over tables defined in the *Table API*. ## Programs and Dataflows -The
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5045#discussion_r152608405 --- Diff: docs/concepts/programming-model.md --- @@ -132,14 +131,13 @@ One typically distinguishes different types of windows, such as *tumbling window -More window examples can be found in this [blog post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html). -More details are in the [window docs](../dev/stream/operators/windows.html). +You can find more window examples in this [blog post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html) and in the [window documentation](../dev/stream/operators/windows.html). --- End diff -- Line break before "and". ---
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5045#discussion_r152607252 --- Diff: docs/concepts/programming-model.md --- @@ -33,53 +33,52 @@ Flink offers different levels of abstraction to develop streaming/batch applicat - - The lowest level abstraction simply offers **stateful streaming**. It is embedded into the [DataStream API](../dev/datastream_api.html) -via the [Process Function](../dev/stream/operators/process_function.html). It allows users freely process events from one or more streams, -and use consistent fault tolerant *state*. In addition, users can register event time and processing time callbacks, + - The lowest level abstraction offers **stateful streaming** and is embedded into the [DataStream API](../dev/datastream_api.html) +via the [Process Function](../dev/stream/operators/process_function.html). It allows users to process events from one or more streams, +and use consistent fault tolerant *state*. Users can register event time and processing time callbacks, allowing programs to realize sophisticated computations. - - In practice, most applications would not need the above described low level abstraction, but would instead program against the + - In practice, most applications would not need the low level abstraction describe above, but would instead program against the **Core APIs** like the [DataStream API](../dev/datastream_api.html) (bounded/unbounded streams) and the [DataSet API](../dev/batch/index.html) -(bounded data sets). These fluent APIs offer the common building blocks for data processing, like various forms of user-specified +(bounded data sets). These fluent APIs offer the common building blocks for data processing, like forms of user-specified transformations, joins, aggregations, windows, state, etc. Data types processed in these APIs are represented as classes -in the respective programming languages. +in respective programming languages. -The low level *Process Function* integrates with the *DataStream API*, making it possible to go the lower level abstraction -for certain operations only. The *DataSet API* offers additional primitives on bounded data sets, like loops/iterations. +The low level *Process Function* integrates with the *DataStream API*, making it possible to use the lower level abstraction +for certain operations. The *DataSet API* offers additional primitives on bounded data sets, like loops or iterations. - The **Table API** is a declarative DSL centered around *tables*, which may be dynamically changing tables (when representing streams). -The [Table API](../dev/table_api.html) follows the (extended) relational model: Tables have a schema attached (similar to tables in relational databases) +The [Table API](../dev/table_api.html) follows the (extended) relational model. Tables have a schema attached (similar to tables in relational databases) and the API offers comparable operations, such as select, project, join, group-by, aggregate, etc. -Table API programs declaratively define *what logical operation should be done* rather than specifying exactly - *how the code for the operation looks*. Though the Table API is extensible by various types of user-defined +Table API programs declaratively define *what logical operation should to perform* rather than specifying + *how the code for the operation looks*. The Table API is extensible by various types of user-defined functions, it is less expressive than the *Core APIs*, but more concise to use (less code to write). --- End diff -- "functions, it" -> "functions. It" ---
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5045#discussion_r152612059 --- Diff: docs/concepts/runtime.md --- @@ -107,21 +107,20 @@ With hyper-threading, each slot then takes 2 or more hardware thread contexts. ## State Backends -The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state/state_backends.html). One state backend +The exact data structures in which the key/values indexes are stored depends on the [state backend](../ops/state/state_backends.html) you choose. One state backend stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. -In addition to defining the data structure that holds the state, the state backends also implement the logic to -take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. +The state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. {% top %} ## Savepoints -Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. +Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. -[Savepoints](../ops/state/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed. +[Savepoints](../ops/state/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. The last completed checkpoint is only needed for recovery and you can safely discard older checkpoints as soon as a new one is completed. --- End diff -- Should we leave the original but change "can be safely" to "are" or "are by default"? The paragraph starts discussing savepoints but then switches to commenting on checkpoints with no distinction about externalized checkpoints. ---
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5045#discussion_r152605878 --- Diff: docs/concepts/programming-model.md --- @@ -33,53 +33,52 @@ Flink offers different levels of abstraction to develop streaming/batch applicat - - The lowest level abstraction simply offers **stateful streaming**. It is embedded into the [DataStream API](../dev/datastream_api.html) -via the [Process Function](../dev/stream/operators/process_function.html). It allows users freely process events from one or more streams, -and use consistent fault tolerant *state*. In addition, users can register event time and processing time callbacks, + - The lowest level abstraction offers **stateful streaming** and is embedded into the [DataStream API](../dev/datastream_api.html) +via the [Process Function](../dev/stream/operators/process_function.html). It allows users to process events from one or more streams, +and use consistent fault tolerant *state*. Users can register event time and processing time callbacks, allowing programs to realize sophisticated computations. - - In practice, most applications would not need the above described low level abstraction, but would instead program against the + - In practice, most applications would not need the low level abstraction describe above, but would instead program against the --- End diff -- "describe" -> "described" ---
[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5045#discussion_r152609451 --- Diff: docs/concepts/runtime.md --- @@ -74,10 +74,10 @@ To control how many tasks a worker accepts, a worker has so called **task slots* Each *task slot* represents a fixed subset of resources of the TaskManager. A TaskManager with three slots, for example, will dedicate 1/3 of its managed memory to each slot. Slotting the resources means that a subtask will not compete with subtasks from other jobs for managed memory, but instead has a certain amount of reserved -managed memory. Note that no CPU isolation happens here; currently slots only separate the managed memory of tasks. +managed memory. No CPU isolation happens, slots only separate the managed memory of tasks. --- End diff -- "," -> ";"? ---
[jira] [Updated] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery
[ https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski updated FLINK-8132: -- Description: Faulty scenario with producer pool of 2. 1. started transaction 1 with producerA, written record 42 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43 3. checkpoint 1 completed, committing txn1, returning producerA to the pool 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44 5. crash 6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1) 7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed result is that both records 42 and 44 are committed. was: Faulty scenario with producer pool of 2. 1. started transaction 1 with producerA, written record 42 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, written record 43 3. checkpoint 1 completed, committing txn1, returning producerA to the pool 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, written record 44 5. crash 6. recover to checkpoint 1, txn1 from producerA found to "pendingCommitTransactions", attempting to recoverAndCommit(txn1) 7. unfortunately txn1 and txn3 from the same producers are identical from KafkaBroker perspective and thus txn3 is being committed result is that both records 42 and 44 are committed. Proposed solution is to postpone returning producers to the pool until we are sure that previous checkpoint (for which given producer was used) will not be used for recovery (at least one more checkpoint was completed). > FlinkKafkaProducer011 can commit incorrect transaction during recovery > -- > > Key: FLINK-8132 > URL: https://issues.apache.org/jira/browse/FLINK-8132 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > Faulty scenario with producer pool of 2. > 1. started transaction 1 with producerA, written record 42 > 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, > written record 43 > 3. checkpoint 1 completed, committing txn1, returning producerA to the pool > 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, > written record 44 > 5. crash > 6. recover to checkpoint 1, txn1 from producerA found to > "pendingCommitTransactions", attempting to recoverAndCommit(txn1) > 7. unfortunately txn1 and txn3 from the same producers are identical from > KafkaBroker perspective and thus txn3 is being committed > result is that both records 42 and 44 are committed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262841#comment-16262841 ] ASF GitHub Bot commented on FLINK-4822: --- Github user taizilongxu commented on the issue: https://github.com/apache/flink/pull/5050 the prefix is the same as kafka high level api, contain the uuid string, so it could be unique > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5050: [FLINK-4822] Ensure that the Kafka 0.8 connector is compa...
Github user taizilongxu commented on the issue: https://github.com/apache/flink/pull/5050 the prefix is the same as kafka high level api, contain the uuid string, so it could be unique ---
[GitHub] flink issue #5047: Code refine of WordWithCount
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5047 This change doesn't work since `WordWithCount` needs to be a POJO which requires a default or no-args constructor and non-final public attributes or getters/setters. Since the type is not a POJO the `TypeExtractor` falls back to `GenericType`, which cannot be used as a key type as noted in the error message: Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.949 sec <<< FAILURE! - in org.apache.flink.streaming.test.socket.SocketWindowWordCountITCase testJavaProgram(org.apache.flink.streaming.test.socket.SocketWindowWordCountITCase) Time elapsed: 0.045 sec <<< ERROR! org.apache.flink.api.common.InvalidProgramException: This type (GenericType) cannot be used as key. at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330) at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:295) at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:79) at org.apache.flink.streaming.test.socket.SocketWindowWordCountITCase.testJavaProgram(SocketWindowWordCountITCase.java:65) ---
[jira] [Commented] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting
[ https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262785#comment-16262785 ] ASF GitHub Bot commented on FLINK-8081: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5049 +1 > Annotate MetricRegistry#getReporters() with @VisibleForTesting > -- > > Key: FLINK-8081 > URL: https://issues.apache.org/jira/browse/FLINK-8081 > Project: Flink > Issue Type: Improvement > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {{MetricRegistry#getReporters()}} is only used for testing purposes to > provide access to instantiated reporters. We should annotate this method with > {{@VisibleForTesting}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5049: [FLINK-8081][metrics] Annotate 'MetricRegistry#getReporte...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5049 +1 ---
[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string
[ https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262775#comment-16262775 ] ASF GitHub Bot commented on FLINK-8070: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5012 Ah, thanks @zentol for the clarification. Very nice to have this! > YarnTestBase should print prohibited string > --- > > Key: FLINK-8070 > URL: https://issues.apache.org/jira/browse/FLINK-8070 > Project: Flink > Issue Type: Improvement > Components: Tests, YARN >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.4.0 > > > The yarn tests check the log files for a set of prohibited strings. If found, > the entire log file is logged as WARN, the offending line is logged as ERROR, > and the test fails with this unhelpful message: > {code} > java.lang.AssertionError(Found a file > /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1510164935122_0002/container_1510164935122_0002_01_01/jobmanager.log > with a prohibited string: [Exception, Started > SelectChannelConnector@0.0.0.0:8081]) > {code} > If you don't have log access on travis you have thus no knowledge what > actually went wrong. > I propose to also print smaller excerpts around the found error (like 10 > lines or smth) in the Assert.fail message. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5012 Ah, thanks @zentol for the clarification. Very nice to have this! ---
[jira] [Closed] (FLINK-7841) Add docs for Flink's S3 support
[ https://issues.apache.org/jira/browse/FLINK-7841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7841. --- Resolution: Fixed > Add docs for Flink's S3 support > --- > > Key: FLINK-7841 > URL: https://issues.apache.org/jira/browse/FLINK-7841 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Stephan Ewen >Assignee: Nico Kruber > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/5006#discussion_r152591791 --- Diff: docs/dev/stream/state/queryable_state.md --- @@ -162,14 +161,19 @@ So far, you have set up your cluster to run with queryable state and you have de queryable. Now it is time to see how to query this state. For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` -jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below: +jar which must explicitly included as a dependency in the `pom.xml` of your project along with `flink-core`, as shown below: --- End diff -- "must" -> "must be" ---
[jira] [Commented] (FLINK-7574) flink-clients
[ https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262746#comment-16262746 ] ASF GitHub Bot commented on FLINK-7574: --- Github user yew1eb closed the pull request at: https://github.com/apache/flink/pull/4712 > flink-clients > - > > Key: FLINK-7574 > URL: https://issues.apache.org/jira/browse/FLINK-7574 > Project: Flink > Issue Type: Sub-task > Components: Build System >Affects Versions: 1.3.2 > Environment: Apache Maven 3.3.9, Java version: 1.8.0_144 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > > [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ > flink-clients_2.11 --- > [WARNING] Used undeclared dependencies found: > [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile > [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile > [WARNING] Unused declared dependencies found: > [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test > [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile > [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test > [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile > [WARNING]log4j:log4j:jar:1.2.17:test > [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test > [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4716) Add trigger full recovery button to web UI
[ https://issues.apache.org/jira/browse/FLINK-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262747#comment-16262747 ] Aljoscha Krettek commented on FLINK-4716: - What's the state of this? And also: what's the purpose of this? > Add trigger full recovery button to web UI > -- > > Key: FLINK-4716 > URL: https://issues.apache.org/jira/browse/FLINK-4716 > Project: Flink > Issue Type: Improvement > Components: Webfrontend >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Priority: Minor > > Add a trigger full recovery button to the web UI. The full recovery button > will take the latest completed checkpoint and resume from there. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4712: [FLINK-7574][build] POM Cleanup flink-clients
Github user yew1eb closed the pull request at: https://github.com/apache/flink/pull/4712 ---
[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh
[ https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262720#comment-16262720 ] ASF GitHub Bot commented on FLINK-4822: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5050 Don't you need a unique ID for this to work properly? (The task name is not unique) > Ensure that the Kafka 0.8 connector is compatible with > kafka-consumer-groups.sh > --- > > Key: FLINK-4822 > URL: https://issues.apache.org/jira/browse/FLINK-4822 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Robert Metzger > > The Kafka 0.8 connector is not properly creating all datastructures in > Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool. > A user reported the issue here: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5050: [FLINK-4822] Ensure that the Kafka 0.8 connector is compa...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5050 Don't you need a unique ID for this to work properly? (The task name is not unique) ---
[GitHub] flink pull request #5051: [hotfix] [javadocs] Fixed typo in Trigger doc
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5051 ---
[GitHub] flink issue #5051: [hotfix] [javadocs] Fixed typo in Trigger doc
Github user zentol commented on the issue: https://github.com/apache/flink/pull/5051 +1, merging. ---
[GitHub] flink issue #5052: [FLINK-8133][REST][docs] Generate REST API documentation
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5052 Thanks @zentol I will have a look. ---
[jira] [Commented] (FLINK-8133) Generate documentation for new REST API
[ https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262716#comment-16262716 ] ASF GitHub Bot commented on FLINK-8133: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5052 Thanks @zentol I will have a look. > Generate documentation for new REST API > --- > > Key: FLINK-8133 > URL: https://issues.apache.org/jira/browse/FLINK-8133 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8133) Generate documentation for new REST API
[ https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262704#comment-16262704 ] ASF GitHub Bot commented on FLINK-8133: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5052 [FLINK-8133][REST][docs] Generate REST API documentation ## What is the purpose of the change This PR adds a generator for the REST API documentation. The generated has to be run manually (automatic integration is a bit tricky, in particular detecting changes & failing the build). The generator can be called by either specifying either `-Pgenerate-rest-docs` or `-Dgenerate-rest-docs` when building `flink-docs`. The generated content was added to the REST API documentation (`/monitoring/rest_api.html`). Here's a preview of the result: ![snippet](https://user-images.githubusercontent.com/5725237/33132910-522eb2d2-cf9b-11e7-844c-613fdcd818c8.PNG) ## Brief change log * modify URL comparator in RestServerEndpoint to be public * add a new module `flink-docs` (to not pollute other modules with dependencies) * implement generator * generated dispatcher REST documentation * integrated generated content into rest_api.md ## Verifying this change Needs manual verification. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8133 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5052.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5052 > Generate documentation for new REST API > --- > > Key: FLINK-8133 > URL: https://issues.apache.org/jira/browse/FLINK-8133 > Project: Flink > Issue Type: Improvement > Components: Documentation, REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5052: [FLINK-8133][REST][docs] Generate REST API documen...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/5052 [FLINK-8133][REST][docs] Generate REST API documentation ## What is the purpose of the change This PR adds a generator for the REST API documentation. The generated has to be run manually (automatic integration is a bit tricky, in particular detecting changes & failing the build). The generator can be called by either specifying either `-Pgenerate-rest-docs` or `-Dgenerate-rest-docs` when building `flink-docs`. The generated content was added to the REST API documentation (`/monitoring/rest_api.html`). Here's a preview of the result: ![snippet](https://user-images.githubusercontent.com/5725237/33132910-522eb2d2-cf9b-11e7-844c-613fdcd818c8.PNG) ## Brief change log * modify URL comparator in RestServerEndpoint to be public * add a new module `flink-docs` (to not pollute other modules with dependencies) * implement generator * generated dispatcher REST documentation * integrated generated content into rest_api.md ## Verifying this change Needs manual verification. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8133 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5052.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5052 ---
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262597#comment-16262597 ] ASF GitHub Bot commented on FLINK-2170: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152580041 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java --- @@ -0,0 +1,1511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import org.apache.orc.TypeDescription; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.TimeZone; +import java.util.function.DoubleFunction; +import java.util.function.IntFunction; +import java.util.function.LongFunction; + +/** + * A class that provides utility methods for orc file reading. + */ +class OrcUtils { + + private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 * 1000 + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Converts an ORC schema to a Flink TypeInformation. +* +* @param schema The ORC schema. +* @return The TypeInformation that corresponds to the ORC schema. +*/ + static TypeInformation schemaToTypeInfo(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return BasicTypeInfo.BOOLEAN_TYPE_INFO; + case BYTE: + return BasicTypeInfo.BYTE_TYPE_INFO; + case SHORT: + return BasicTypeInfo.SHORT_TYPE_INFO; + case INT: + return BasicTypeInfo.INT_TYPE_INFO; + case LONG: + return BasicTypeInfo.LONG_TYPE_INFO; + case FLOAT: + return BasicTypeInfo.FLOAT_TYPE_INFO; + case DOUBLE: + return BasicTypeInfo.DOUBLE_TYPE_INFO; + case DECIMAL: + return BasicTypeInfo.BIG_DEC_TYPE_INFO; + case STRING: + case CHAR: + case VARCHAR: + return BasicTypeInfo.STRING_TYPE_INFO; + case DATE: + return SqlTimeTypeInfo.DATE; + case TIMESTAMP: +
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152580041 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java --- @@ -0,0 +1,1511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; + +import org.apache.orc.TypeDescription; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.TimeZone; +import java.util.function.DoubleFunction; +import java.util.function.IntFunction; +import java.util.function.LongFunction; + +/** + * A class that provides utility methods for orc file reading. + */ +class OrcUtils { + + private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 * 1000 + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Converts an ORC schema to a Flink TypeInformation. +* +* @param schema The ORC schema. +* @return The TypeInformation that corresponds to the ORC schema. +*/ + static TypeInformation schemaToTypeInfo(TypeDescription schema) { + switch (schema.getCategory()) { + case BOOLEAN: + return BasicTypeInfo.BOOLEAN_TYPE_INFO; + case BYTE: + return BasicTypeInfo.BYTE_TYPE_INFO; + case SHORT: + return BasicTypeInfo.SHORT_TYPE_INFO; + case INT: + return BasicTypeInfo.INT_TYPE_INFO; + case LONG: + return BasicTypeInfo.LONG_TYPE_INFO; + case FLOAT: + return BasicTypeInfo.FLOAT_TYPE_INFO; + case DOUBLE: + return BasicTypeInfo.DOUBLE_TYPE_INFO; + case DECIMAL: + return BasicTypeInfo.BIG_DEC_TYPE_INFO; + case STRING: + case CHAR: + case VARCHAR: + return BasicTypeInfo.STRING_TYPE_INFO; + case DATE: + return SqlTimeTypeInfo.DATE; + case TIMESTAMP: + return SqlTimeTypeInfo.TIMESTAMP; + case BINARY: + return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; + case STRUCT: +
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262593#comment-16262593 ] ASF GitHub Bot commented on FLINK-2170: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152579106 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java --- @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.orc.OrcUtils.fillRows; + +/** + * InputFormat to read ORC files. + */ +public class OrcRowInputFormat extends FileInputFormat implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); + // the number of rows read in a batch + private static final int DEFAULT_BATCH_SIZE = 1000; + + // the number of fields rows to read in a batch + private int batchSize; + // the configuration to read with + private Configuration conf; + // the schema of the ORC files to read + private TypeDescription schema; + + // the fields of the ORC schema that the returned Rows are composed of. + private int[] selectedFields; + // the type information of the Rows returned by this InputFormat. + private transient RowTypeInfo rowType; + + // the ORC reader + private transient RecordReader orcRowsReader; + // the vectorized row data to be read in a batch + private transient VectorizedRowBatch rowBatch; + // the vector of rows that is read in a batch + private transient Row[] rows; + + // the number of rows in the current batch + private transient int rowsInBatch; + // the index of the next row to return + private transient int nextRow; + + private ArrayList conjunctPredicates = new ArrayList<>(); + + /** +* Creates an OrcRowInputFormat. +* +* @param path The path to read ORC files from. +* @param schemaString The schema of the ORC files as String. +* @param orcConfig The configuration to read the ORC files with. +*/ + public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) { + this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE);
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152579106 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java --- @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.orc.OrcUtils.fillRows; + +/** + * InputFormat to read ORC files. + */ +public class OrcRowInputFormat extends FileInputFormat implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); + // the number of rows read in a batch + private static final int DEFAULT_BATCH_SIZE = 1000; + + // the number of fields rows to read in a batch + private int batchSize; + // the configuration to read with + private Configuration conf; + // the schema of the ORC files to read + private TypeDescription schema; + + // the fields of the ORC schema that the returned Rows are composed of. + private int[] selectedFields; + // the type information of the Rows returned by this InputFormat. + private transient RowTypeInfo rowType; + + // the ORC reader + private transient RecordReader orcRowsReader; + // the vectorized row data to be read in a batch + private transient VectorizedRowBatch rowBatch; + // the vector of rows that is read in a batch + private transient Row[] rows; + + // the number of rows in the current batch + private transient int rowsInBatch; + // the index of the next row to return + private transient int nextRow; + + private ArrayList conjunctPredicates = new ArrayList<>(); + + /** +* Creates an OrcRowInputFormat. +* +* @param path The path to read ORC files from. +* @param schemaString The schema of the ORC files as String. +* @param orcConfig The configuration to read the ORC files with. +*/ + public OrcRowInputFormat(String path, String schemaString, Configuration orcConfig) { + this(path, TypeDescription.fromString(schemaString), orcConfig, DEFAULT_BATCH_SIZE); + } + + /** +* Creates an OrcRowInputFormat. +* +* @param path The path to read ORC files from. +* @param schemaString The schema of the ORC files as String. +* @param orcConfig The
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262591#comment-16262591 ] ASF GitHub Bot commented on FLINK-2170: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152578874 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java --- @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.orc.OrcUtils.fillRows; + +/** + * InputFormat to read ORC files. + */ +public class OrcRowInputFormat extends FileInputFormat implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); + // the number of rows read in a batch + private static final int DEFAULT_BATCH_SIZE = 1000; + + // the number of fields rows to read in a batch + private int batchSize; + // the configuration to read with + private Configuration conf; --- End diff -- Ok, sorry, I didn't check for custom serialization. > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152578874 --- Diff: flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java --- @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.orc.OrcUtils.fillRows; + +/** + * InputFormat to read ORC files. + */ +public class OrcRowInputFormat extends FileInputFormat implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(OrcRowInputFormat.class); + // the number of rows read in a batch + private static final int DEFAULT_BATCH_SIZE = 1000; + + // the number of fields rows to read in a batch + private int batchSize; + // the configuration to read with + private Configuration conf; --- End diff -- Ok, sorry, I didn't check for custom serialization. ---
[jira] [Comment Edited] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()
[ https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262584#comment-16262584 ] Andrey edited comment on FLINK-5465 at 11/22/17 2:19 PM: - We were able to reproduce this issue on Flink 1.3.2. This is very critical bug, because failing job could bring whole flink cluster down. Failing job will amplify chances for rocksdb crash by constant start/cancel. was (Author: dernasherbrezon): We were able to reproduce this issue on Flink 1.3.2. This is very critical bug, because failing job could bring whole flink cluster down. Failing job amplifying chances for rocksdb crash by constant start/cancel. > RocksDB fails with segfault while calling AbstractRocksDBState.clear() > -- > > Key: FLINK-5465 > URL: https://issues.apache.org/jira/browse/FLINK-5465 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger > Attachments: hs-err-pid26662.log > > > I'm using Flink 699f4b0. > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576 > # > # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build > 1.7.0_67-b01) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode > linux-amd64 compressed oops) > # Problematic frame: > # C [librocksdbjni-linux64.so+0x1aeb78] > rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8 > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # > /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log > Compiled method (nm) 1869778 903 n org.rocksdb.RocksDB::remove > (native) > total in heap [0x7f91b40b9dd0,0x7f91b40ba150] = 896 > relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88 > main code [0x7f91b40b9f60,0x7f91b40ba150] = 496 > # > # If you would like to submit a bug report, please visit: > # http://bugreport.sun.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()
[ https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262584#comment-16262584 ] Andrey commented on FLINK-5465: --- We were able to reproduce this issue on Flink 1.3.2. This is very critical bug, because failing job could bring whole flink cluster down. Failing job amplifying chances for rocksdb crash by constant start/cancel. > RocksDB fails with segfault while calling AbstractRocksDBState.clear() > -- > > Key: FLINK-5465 > URL: https://issues.apache.org/jira/browse/FLINK-5465 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Robert Metzger > Attachments: hs-err-pid26662.log > > > I'm using Flink 699f4b0. > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576 > # > # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build > 1.7.0_67-b01) > # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode > linux-amd64 compressed oops) > # Problematic frame: > # C [librocksdbjni-linux64.so+0x1aeb78] > rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8 > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # > /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log > Compiled method (nm) 1869778 903 n org.rocksdb.RocksDB::remove > (native) > total in heap [0x7f91b40b9dd0,0x7f91b40ba150] = 896 > relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88 > main code [0x7f91b40b9f60,0x7f91b40ba150] = 496 > # > # If you would like to submit a bug report, please visit: > # http://bugreport.sun.com/bugreport/crash.jsp > # The crash happened outside the Java Virtual Machine in native code. > # See problematic frame for where to report the bug. > # > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262583#comment-16262583 ] ASF GitHub Bot commented on FLINK-2170: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152577385 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan( override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) -planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + +val adjustedCnt: Double = tableSource match { + case f: FilterableTableSource[_] if f.isFilterPushedDown => +// ensure we prefer FilterableTableSources with pushed-down filters. +rowCnt - 1.0 --- End diff -- Doesn't really make a difference IMO. It's all about relative costs. We only need to make sure that a `FilterableTableSource` with pushed down predicates appears to be less expensive than the same `TableSource` without pushed predicates. The problem has not occurred before, because the `OrcTableSource` does not guarantee that all emitted rows match the pushed predicates. Therefore, all predicates are also applied by a following `Calc` operator such that the cost of that operator is not decreased. > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152577385 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala --- @@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan( override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { val rowCnt = metadata.getRowCount(this) -planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * estimateRowSize(getRowType)) + +val adjustedCnt: Double = tableSource match { + case f: FilterableTableSource[_] if f.isFilterPushedDown => +// ensure we prefer FilterableTableSources with pushed-down filters. +rowCnt - 1.0 --- End diff -- Doesn't really make a difference IMO. It's all about relative costs. We only need to make sure that a `FilterableTableSource` with pushed down predicates appears to be less expensive than the same `TableSource` without pushed predicates. The problem has not occurred before, because the `OrcTableSource` does not guarantee that all emitted rows match the pushed predicates. Therefore, all predicates are also applied by a following `Calc` operator such that the cost of that operator is not decreased. ---
[jira] [Commented] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery
[ https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262565#comment-16262565 ] Kostas Kloudas commented on FLINK-8132: --- This is a bug or a blocker? > FlinkKafkaProducer011 can commit incorrect transaction during recovery > -- > > Key: FLINK-8132 > URL: https://issues.apache.org/jira/browse/FLINK-8132 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > Faulty scenario with producer pool of 2. > 1. started transaction 1 with producerA, written record 42 > 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, > written record 43 > 3. checkpoint 1 completed, committing txn1, returning producerA to the pool > 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, > written record 44 > 5. crash > 6. recover to checkpoint 1, txn1 from producerA found to > "pendingCommitTransactions", attempting to recoverAndCommit(txn1) > 7. unfortunately txn1 and txn3 from the same producers are identical from > KafkaBroker perspective and thus txn3 is being committed > result is that both records 42 and 44 are committed. > Proposed solution is to postpone returning producers to the pool until we are > sure that previous checkpoint (for which given producer was used) will not be > used for recovery (at least one more checkpoint was completed). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3924) Remove protobuf shading from Kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262564#comment-16262564 ] Robert Metzger commented on FLINK-3924: --- I think this will be a problem till Kinesis/Amazon solves the problem. But I haven't checked recently if they changed something. > Remove protobuf shading from Kinesis connector > -- > > Key: FLINK-3924 > URL: https://issues.apache.org/jira/browse/FLINK-3924 > Project: Flink > Issue Type: Task > Components: Kinesis Connector, Streaming Connectors >Reporter: Robert Metzger > > The Kinesis connector is currently creating a fat jar with a custom protobuf > version (2.6.1), relocated into a different package. > We need to build the fat jar to change the protobuf calls from the original > protobuf to the relocated one. > Because Kinesis is licensed under the Amazon Software License (which is not > entirely to the ASL2.0), I don't want to deploy kinesis connector binaries to > maven central with the releases. These binaries would contain code from > Amazon. It would be more than just linking to an (optional) dependencies. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources
[ https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262563#comment-16262563 ] Xingcan Cui commented on FLINK-8118: Hi [~fhueske], thanks for the suggestions! Since the existing configurations in {{KafkaTableSource.Builder}} are all about the {{TableSource}} itself while the starting offsets are set for the inner {{FlinkKafkaConsumerBase}}, the code may be a little verbose. Anyway, I'll create a PR soon and let's discuss that later. Thanks, Xingcan > Allow to specify the offsets of KafkaTableSources > - > > Key: FLINK-8118 > URL: https://issues.apache.org/jira/browse/FLINK-8118 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Xingcan Cui > > Right now the Kafka TableSources can only read from the current group offset. > We should expose the possibilities of the Kafka Consumer: > https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery
[ https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-8132: -- Comment: was deleted (was: This is a bug or a blocker?) > FlinkKafkaProducer011 can commit incorrect transaction during recovery > -- > > Key: FLINK-8132 > URL: https://issues.apache.org/jira/browse/FLINK-8132 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > Faulty scenario with producer pool of 2. > 1. started transaction 1 with producerA, written record 42 > 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, > written record 43 > 3. checkpoint 1 completed, committing txn1, returning producerA to the pool > 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, > written record 44 > 5. crash > 6. recover to checkpoint 1, txn1 from producerA found to > "pendingCommitTransactions", attempting to recoverAndCommit(txn1) > 7. unfortunately txn1 and txn3 from the same producers are identical from > KafkaBroker perspective and thus txn3 is being committed > result is that both records 42 and 44 are committed. > Proposed solution is to postpone returning producers to the pool until we are > sure that previous checkpoint (for which given producer was used) will not be > used for recovery (at least one more checkpoint was completed). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2170) Add OrcTableSource
[ https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262546#comment-16262546 ] ASF GitHub Bot commented on FLINK-2170: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152571674 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java --- @@ -355,4 +355,21 @@ public void addComparatorField(int fieldId, TypeComparator comparator) { comparatorOrders); } } + + /** +* Creates a {@link RowTypeInfo} with projected fields. +* +* @param rowType The original RowTypeInfo whose fields are projected +* @param fieldMapping The field mapping of the projection +* @return A RowTypeInfo with projected fields. +*/ + public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] fieldMapping) { --- End diff -- I think it is cleaner to have this as a static method than an instance method. A static method makes it explicit that this creates a new (immutable) `RowTypeInfo`. > Add OrcTableSource > -- > > Key: FLINK-2170 > URL: https://issues.apache.org/jira/browse/FLINK-2170 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 0.9 >Reporter: Fabian Hueske >Assignee: Usman Younas >Priority: Minor > Labels: starter > > Add a {{OrcTableSource}} to read data from an ORC file. The > {{OrcTableSource}} should implement the {{ProjectableTableSource}} > (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5043#discussion_r152571674 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java --- @@ -355,4 +355,21 @@ public void addComparatorField(int fieldId, TypeComparator comparator) { comparatorOrders); } } + + /** +* Creates a {@link RowTypeInfo} with projected fields. +* +* @param rowType The original RowTypeInfo whose fields are projected +* @param fieldMapping The field mapping of the projection +* @return A RowTypeInfo with projected fields. +*/ + public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] fieldMapping) { --- End diff -- I think it is cleaner to have this as a static method than an instance method. A static method makes it explicit that this creates a new (immutable) `RowTypeInfo`. ---
[jira] [Created] (FLINK-8135) Add description to MessageParameter
Chesnay Schepler created FLINK-8135: --- Summary: Add description to MessageParameter Key: FLINK-8135 URL: https://issues.apache.org/jira/browse/FLINK-8135 Project: Flink Issue Type: Improvement Components: Documentation, REST Reporter: Chesnay Schepler Fix For: 1.5.0 For documentation purposes we should add an {{getDescription()}} method to the {{MessageParameter}} class, describing what this particular parameter is used for and which values are accepted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected
[ https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262540#comment-16262540 ] Fabian Hueske commented on FLINK-6101: -- You are right that SQL doesn't support `AS`. But also SQL is not a perfect language and has shortcomings. IMO, the Table API should follow SQL semantics and where it applies also SQL syntax. Having optional support for `AS` in `groupBy()` would not be a problem in that regard because it does not change the semantics and is a nice shortcut. > GroupBy fields with arithmetic expression (include UDF) can not be selected > --- > > Key: FLINK-6101 > URL: https://issues.apache.org/jira/browse/FLINK-6101 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Minor > > currently the TableAPI do not support selecting GroupBy fields with > expression either using original field name or the expression > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > caused > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > (BTW, this syntax is invalid in RDBMS which will indicate the selected column > is invalid in the select list because it is not contained in either an > aggregate function or the GROUP BY clause in SQL Server.) > and > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b%3, 'c.min, 'e, 'a.avg, 'd.count) > {code} > will also cause > {code} > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [e, ('b % 3), TMP_0, TMP_1, TMP_2]. > {code} > and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. > and apply an UDF doesn’t work either > {code} >table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, > 'd.count, 'e.avg) > org.apache.flink.table.api.ValidationException: Cannot resolve [b] given > input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, > TMP_1, TMP_2]. > {code} > the only way to get this work can be > {code} > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .select('a, 'b%3 as 'b, 'c, 'd, 'e) > .groupBy('e, 'b) > .select('b, 'c.min, 'e, 'a.avg, 'd.count) > {code} > One way to solve this is to add support alias in groupBy clause ( it seems a > bit odd against SQL though TableAPI has a different groupBy grammar), > and I prefer to support select original expressions and UDF in groupBy > clause(make consistent with SQL). > as thus: > {code} > // use expression > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, 'b % 3) > .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count) > // use UDF > val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, > 'd, 'e) > .groupBy('e, Mod('b,3)) > .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count) > {code} > After had a look into the code, found there was a problem in the groupBy > implementation, validation hadn't considered the expressions in groupBy > clause. it should be noted that a table has been actually changed after > groupBy operation ( a new Table) and the groupBy keys replace the original > field reference in essence. > > What do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8134) Add description to MessageHeaders
Chesnay Schepler created FLINK-8134: --- Summary: Add description to MessageHeaders Key: FLINK-8134 URL: https://issues.apache.org/jira/browse/FLINK-8134 Project: Flink Issue Type: Improvement Components: Documentation, REST Reporter: Chesnay Schepler Fix For: 1.5.0 For documentation purposes we should add an {{getDescription()}} method to the {{MessageHeaders}} interface, describing what particular action this REST call executes. -- This message was sent by Atlassian JIRA (v6.4.14#64029)