[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected

2017-11-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263908#comment-16263908
 ] 

Fabian Hueske commented on FLINK-6101:
--

In the example you gave, I'd think that {{'adId}} in {{select()}} refers to the 
named expression in the {{groupBy()}} call, i.e., {{'adId + 1}} because the 
{{as}} overrides the previous definition of {{'adId}}. If a user would want the 
original meaning of {{'adId}}, the user should not reuse {{'adId}} in the 
{{as}} expression. 
I think this is a corner case, but I can see how it might cause confusion.
So I'm fine with keeping {{as}} out of {{groupBy()}} for now.

> GroupBy fields with arithmetic expression (include UDF) can not be selected
> ---
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> (BTW, this syntax is invalid in RDBMS which will indicate the selected column 
> is invalid in the select list because it is not contained in either an 
> aggregate function or the GROUP BY clause in SQL Server.)
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will also cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. 
> and apply an UDF doesn’t work either
> {code}
>table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, 
> 'd.count, 'e.avg)
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, 
> TMP_1, TMP_2].
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> One way to solve this is to add support alias in groupBy clause ( it seems a 
> bit odd against SQL though TableAPI has a different groupBy grammar),  
> and I prefer to support select original expressions and UDF in groupBy 
> clause(make consistent with SQL).
> as thus:
> {code}
> // use expression
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count)
> // use UDF
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, Mod('b,3))
> .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count)
> {code}

> After had a look into the code, found there was a problem in the groupBy 
> implementation, validation hadn't considered the expressions in groupBy 
> clause. it should be noted that a table has been actually changed after 
> groupBy operation ( a new Table) and the groupBy keys replace the original 
> field reference in essence.
>  
> What do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-2170.

   Resolution: Implemented
Fix Version/s: 1.5.0
   1.4.0

Implemented for 1.4.0 with 35517f1291293f73f4466a4fdbed4296b2dd80a5
Implemented for 1.4.0 with 200612ee0eaa42fdba141be138de172f86798f54

> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
> Fix For: 1.4.0, 1.5.0
>
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected

2017-11-22 Thread lincoln.lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263755#comment-16263755
 ] 

lincoln.lee commented on FLINK-6101:


[~fhueske]   Agree with you that SQL is not perfect. But in this case I think 
it's reasonable that SQL do not support alias in groupBy clause.
Think about such a query similar to which Timo mentioned above:
{code}
.window(Tumble over 1.minute on 'showTime as 'w)
.groupBy('adId + 1 as 'adId, 'w)
.select('adId + 1, 'w.rowtime as 'wtime, 'clickEarn.sum as 'revenue)
{code}
What does `'adId + 1` represent in select clause ?   Add alias support in 
groupBy is handy but may encounter confusing cases. What do you think?

BTW, we're having another discussion about the limitation of column expression 
in select clause after a groupBy (this issue had been partly discussed with 
Julian, see https://issues.apache.org/jira/browse/CALCITE-1710),  we found that 
MySQL and SQL Server have different implementations, extend the select 
expressions maybe more meaningful. I'll sync here once we have an preliminary 
conclusion.

> GroupBy fields with arithmetic expression (include UDF) can not be selected
> ---
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> (BTW, this syntax is invalid in RDBMS which will indicate the selected column 
> is invalid in the select list because it is not contained in either an 
> aggregate function or the GROUP BY clause in SQL Server.)
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will also cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. 
> and apply an UDF doesn’t work either
> {code}
>table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, 
> 'd.count, 'e.avg)
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, 
> TMP_1, TMP_2].
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> One way to solve this is to add support alias in groupBy clause ( it seems a 
> bit odd against SQL though TableAPI has a different groupBy grammar),  
> and I prefer to support select original expressions and UDF in groupBy 
> clause(make consistent with SQL).
> as thus:
> {code}
> // use expression
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count)
> // use UDF
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, Mod('b,3))
> .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count)
> {code}

> After had a look into the code, found there was a problem in the groupBy 
> implementation, validation hadn't considered the expressions in groupBy 
> clause. it should be noted that a table has been actually changed after 
> groupBy operation ( a new Table) and the groupBy keys replace the original 
> field reference in essence.
>  
> What do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7967) Deprecate Hadoop specific Flink configuration options

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263669#comment-16263669
 ] 

ASF GitHub Bot commented on FLINK-7967:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4946
  
Thanks, @greghogan . I will make a first refinement based on your 
suggestions.


> Deprecate Hadoop specific Flink configuration options
> -
>
> Key: FLINK-7967
> URL: https://issues.apache.org/jira/browse/FLINK-7967
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Till Rohrmann
>Priority: Trivial
>
> I think we should deprecate the hadoop specific configuration options from 
> Flink and encourage people to use instead the environment variable 
> {{HADOOP_CONF_DIR}} to configure the Hadoop configuration directory. This 
> includes:
> {code}
> fs.hdfs.hdfsdefault
> fs.hdfs.hdfssite
> fs.hdfs.hadoopconf
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4946: [FLINK-7967] [config] Deprecate Hadoop specific Flink con...

2017-11-22 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/4946
  
Thanks, @greghogan . I will make a first refinement based on your 
suggestions.


---


[jira] [Commented] (FLINK-8104) Fix Row value constructor

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263600#comment-16263600
 ] 

ASF GitHub Bot commented on FLINK-8104:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5040
  
Thanks for the prompt review @twalthr . Yeah I think generalizing the 
codegen part would be great. 
Also do you think I can put in TableAPI support in this PR as well? I will 
update the documents and also add more ITCase :-)


> Fix Row value constructor
> -
>
> Key: FLINK-8104
> URL: https://issues.apache.org/jira/browse/FLINK-8104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Support Row value constructor which is currently broken. 
> See 
> {code:java}
> // 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
>   @Test
>   def testValueConstructorFunctions(): Unit = {
> // TODO we need a special code path that flattens ROW types
> // testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
> returns field 0
> // testSqlApi("('hello world', 12)", "hello world") // test base only 
> returns field 0
> // ...
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5040: [FLINK-8104][Table API] fixing ROW type value constructor...

2017-11-22 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5040
  
Thanks for the prompt review @twalthr . Yeah I think generalizing the 
codegen part would be great. 
Also do you think I can put in TableAPI support in this PR as well? I will 
update the documents and also add more ITCase :-)


---


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263443#comment-16263443
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5043


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263442#comment-16263442
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4670


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4670: [FLINK-2170] [connectors] Add ORC connector for Ta...

2017-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4670


---


[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5043


---


[jira] [Commented] (FLINK-8126) Update and fix checkstyle

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263215#comment-16263215
 ] 

ASF GitHub Bot commented on FLINK-8126:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5044
  
Setting the version in IntelliJ to 8.4 seems to work.

Settings -> Other Settings -> Checkstyle -> Checkstyle version drop-down 
menu


> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle

2017-11-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5044
  
Setting the version in IntelliJ to 8.4 seems to work.

Settings -> Other Settings -> Checkstyle -> Checkstyle version drop-down 
menu


---


[jira] [Commented] (FLINK-8126) Update and fix checkstyle

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263209#comment-16263209
 ] 

ASF GitHub Bot commented on FLINK-8126:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5044
  
Which checkstyle version is configured in IntelliJ @StephanEwen ? Could be 
that we have to upgrade it there as well. IIRC we set recommend setting it to 
6.X in the IDE setup guide because of some parsing errors in 8.X, which now may 
be resolved.


> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle

2017-11-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5044
  
Which checkstyle version is configured in IntelliJ @StephanEwen ? Could be 
that we have to upgrade it there as well. IIRC we set recommend setting it to 
6.X in the IDE setup guide because of some parsing errors in 8.X, which now may 
be resolved.


---


[jira] [Commented] (FLINK-7718) Port JobVertixMetricsHandler to new REST endpoint

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16263050#comment-16263050
 ] 

ASF GitHub Bot commented on FLINK-7718:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5055

[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

## Brief change log
  - *Migrate logic in 
`org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler` 
to new handler and add new handler to DispatcherRestEndpoint.*
  - *Add common classes for remaining implementations of

`org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler`,
which require migration as well*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests for all new classes and old classes except for 
`DispatcherRestEndpoint`*
  - *Manually deployed a job locally and verified with `curl` that 
JobVertexMetrics can be queried in FLIP-6.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7718

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5055.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5055


commit b888c876131fa3744baff7393bb75025ccb4b61f
Author: gyao 
Date:   2017-11-22T17:58:03Z

[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

Migrate logic in

org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to
new handler and add new handler to DispatcherRestEndpoint. Add common 
classes
for remaining implementations of
org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler,
which require migration as well.




> Port JobVertixMetricsHandler to new REST endpoint
> -
>
> Key: FLINK-7718
> URL: https://issues.apache.org/jira/browse/FLINK-7718
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JobVertexMetricsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5055: [FLINK-7718] [flip6] Add JobVertexMetricsHandler t...

2017-11-22 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5055

[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

## Brief change log
  - *Migrate logic in 
`org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler` 
to new handler and add new handler to DispatcherRestEndpoint.*
  - *Add common classes for remaining implementations of

`org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler`,
which require migration as well*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests for all new classes and old classes except for 
`DispatcherRestEndpoint`*
  - *Manually deployed a job locally and verified with `curl` that 
JobVertexMetrics can be queried in FLIP-6.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7718

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5055.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5055


commit b888c876131fa3744baff7393bb75025ccb4b61f
Author: gyao 
Date:   2017-11-22T17:58:03Z

[FLINK-7718] [flip6] Add JobVertexMetricsHandler to DispatcherRestEndpoint

Migrate logic in

org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler to
new handler and add new handler to DispatcherRestEndpoint. Add common 
classes
for remaining implementations of
org.apache.flink.runtime.rest.handler.legacy.metrics.AbstractMetricsHandler,
which require migration as well.




---


[jira] [Commented] (FLINK-8126) Update and fix checkstyle

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262995#comment-16262995
 ] 

ASF GitHub Bot commented on FLINK-8126:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5044
  
I am not seeing some problem in IntelliJ with the checkstyle plugin (seems 
to not be able to parse the checkstyle config).

Anyone else seeing this issue? I am on an older IntelliJ version, so might 
be that (reluctant to update because switching Scala versions works less well 
in newer IntelliJ versions).



> Update and fix checkstyle
> -
>
> Key: FLINK-8126
> URL: https://issues.apache.org/jira/browse/FLINK-8126
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Our current checkstyle configuration (checkstyle version 6.19) is missing 
> some ImportOrder and variable naming errors which are detected in 1) IntelliJ 
> using the same checkstyle version and 2) with the maven-checkstyle-plugin 
> with an up-to-date checkstyle version (8.4).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5044: [FLINK-8126] [build] Fix and update checkstyle

2017-11-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5044
  
I am not seeing some problem in IntelliJ with the checkstyle plugin (seems 
to not be able to parse the checkstyle config).

Anyone else seeing this issue? I am on an older IntelliJ version, so might 
be that (reluctant to update because switching Scala versions works less well 
in newer IntelliJ versions).



---


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262976#comment-16262976
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5043
  
Thanks for the review @twalthr!
I'll address your comments and will merge this PR.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152634076
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link OrcTableSource}.
+ */
+public class OrcTableSourceITCase extends MultipleProgramsTestBase {
--- End diff --

`TableProgramsTestBase` doesn't really give any advantage over 
`MultipleProgramsTestBase` since I'm testing the table source and not the 
null-behavior of the query execution.


---


[GitHub] flink issue #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTa...

2017-11-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5043
  
Thanks for the review @twalthr!
I'll address your comments and will merge this PR.


---


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262973#comment-16262973
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152634076
  
--- Diff: 
flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link OrcTableSource}.
+ */
+public class OrcTableSourceITCase extends MultipleProgramsTestBase {
--- End diff --

`TableProgramsTestBase` doesn't really give any advantage over 
`MultipleProgramsTestBase` since I'm testing the table source and not the 
null-behavior of the query execution.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8136.
-
   Resolution: Fixed
Fix Version/s: 1.5.0
   1.4.0

Fixed in 1.5: 5017c679c06de92bec6b094268d61ce5f3109b9e
Fixed in 1.4: 28157962196cecb94a59720e92cbf3682418e821

> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0
>
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262943#comment-16262943
 ] 

ASF GitHub Bot commented on FLINK-8136:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5054


> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5054: [FLINK-8136] [table] Fix code generation with Joda...

2017-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5054


---


[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner

2017-11-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262932#comment-16262932
 ] 

Aljoscha Krettek commented on FLINK-7913:
-

What is the default Kafka partitioner doing different from the default Flink 
Kafka partitioner?

> Add support for Kafka default partitioner
> -
>
> Key: FLINK-7913
> URL: https://issues.apache.org/jira/browse/FLINK-7913
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Konstantin Lalafaryan
>Assignee: Konstantin Lalafaryan
> Fix For: 1.5.0
>
>
> Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* 
> and just one implementation *FlinkFixedPartitioner*. 
> In order to be able to use Kafka's default partitioner you have to create new 
> implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. 
> It will be really good to be able to define the partitioner without 
> implementing the new class.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262930#comment-16262930
 ] 

ASF GitHub Bot commented on FLINK-8136:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5054
  
Merging...


> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5054: [FLINK-8136] [table] Fix code generation with JodaTime sh...

2017-11-22 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5054
  
Merging...


---


[jira] [Reopened] (FLINK-8050) RestServer#shutdown() ignores exceptions thrown when shutting down netty.

2017-11-22 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reopened FLINK-8050:
---

> RestServer#shutdown() ignores exceptions thrown when shutting down netty.
> -
>
> Key: FLINK-8050
> URL: https://issues.apache.org/jira/browse/FLINK-8050
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8137:
---
Description: 
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's input formats perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?

  was:
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?


> Flink JobManager API non-responsive during job submission
> -
>
> Key: FLINK-8137
> URL: https://issues.apache.org/jira/browse/FLINK-8137
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, JobManager, REST, Webfrontend
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 running a batch job in Kubernetes.
>Reporter: Joshua Griffith
>Priority: Minor
>
> When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, 
> the JobManager's REST API appears to hang until the job is submitted. The 
> submission time may be large enough to cause timeouts if the 
> {{getStatistics}} and {{createInputSplits}} methods of a job's input formats 
> perform time-intensive tasks like running external queries. This is 
> exacerbated when a job contains many such input formats since they appear to 
> be initialized sequentially. For a particular job with over 100 inputs, it's 
> typical for the API (and consequently the web UI) to be non-responsive for 
> 45–60 seconds.
> Would it make sense for tasks to have a {{Configuring}} state before the 
> {{Created}} state to provide greater visibility and indicate that the 
> JobManager is still healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8137:
---
Description: 
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?

  was:
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats, since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?


> Flink JobManager API non-responsive during job submission
> -
>
> Key: FLINK-8137
> URL: https://issues.apache.org/jira/browse/FLINK-8137
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, JobManager, REST, Webfrontend
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 running a batch job in Kubernetes.
>Reporter: Joshua Griffith
>Priority: Minor
>
> When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, 
> the JobManager's REST API appears to hang until the job is submitted. The 
> submission time may be large enough to cause timeouts if the 
> {{getStatistics}} and {{createInputSplits}} methods of a job's 
> {{RichInputFormat}} s perform time-intensive tasks like running external 
> queries. This is exacerbated when a job contains many such input formats 
> since they appear to be initialized sequentially. For a particular job with 
> over 100 inputs, it's typical for the API (and consequently the web UI) to be 
> non-responsive for 45–60 seconds.
> Would it make sense for tasks to have a {{Configuring}} state before the 
> {{Created}} state to provide greater visibility and indicate that the 
> JobManager is still healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8137:
---
Description: 
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats, since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?

  was:
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many {{RichInputFormat}}s, since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?


> Flink JobManager API non-responsive during job submission
> -
>
> Key: FLINK-8137
> URL: https://issues.apache.org/jira/browse/FLINK-8137
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, JobManager, REST, Webfrontend
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 running a batch job in Kubernetes.
>Reporter: Joshua Griffith
>Priority: Minor
>
> When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, 
> the JobManager's REST API appears to hang until the job is submitted. The 
> submission time may be large enough to cause timeouts if the 
> {{getStatistics}} and {{createInputSplits}} methods of a job's 
> {{RichInputFormat}}s perform time-intensive tasks like running external 
> queries. This is exacerbated when a job contains many such input formats, 
> since they appear to be initialized sequentially. For a particular job with 
> over 100 inputs, it's typical for the API (and consequently the web UI) to be 
> non-responsive for 45–60 seconds.
> Would it make sense for tasks to have a {{Configuring}} state before the 
> {{Created}} state to provide greater visibility and indicate that the 
> JobManager is still healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8137:
---
Description: 
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}} s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats, since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?

  was:
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many such input formats, since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?


> Flink JobManager API non-responsive during job submission
> -
>
> Key: FLINK-8137
> URL: https://issues.apache.org/jira/browse/FLINK-8137
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, JobManager, REST, Webfrontend
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 running a batch job in Kubernetes.
>Reporter: Joshua Griffith
>Priority: Minor
>
> When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, 
> the JobManager's REST API appears to hang until the job is submitted. The 
> submission time may be large enough to cause timeouts if the 
> {{getStatistics}} and {{createInputSplits}} methods of a job's 
> {{RichInputFormat}} s perform time-intensive tasks like running external 
> queries. This is exacerbated when a job contains many such input formats, 
> since they appear to be initialized sequentially. For a particular job with 
> over 100 inputs, it's typical for the API (and consequently the web UI) to be 
> non-responsive for 45–60 seconds.
> Would it make sense for tasks to have a {{Configuring}} state before the 
> {{Created}} state to provide greater visibility and indicate that the 
> JobManager is still healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8137:
---
Description: 
When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the {{getStatistics}} 
and {{createInputSplits}} methods of a job's {{RichInputFormat}}s perform 
time-intensive tasks like running external queries. This is exacerbated when a 
job contains many {{RichInputFormat}}s, since they appear to be initialized 
sequentially. For a particular job with over 100 inputs, it's typical for the 
API (and consequently the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a {{Configuring}} state before the 
{{Created}} state to provide greater visibility and indicate that the 
JobManager is still healthy?

  was:
When submitting a new FlinkPlan using the StandaloneClusterClient, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the getStatistics and 
createInputSplits methods of a job's RichInputFormats perform time-intensive 
tasks like running external queries. This is exacerbated when a job contains 
many RichInputFormats, since they appear to be initialized sequentially. For a 
particular job with over 100 inputs, it's typical for the API (and consequently 
the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a Configuring state before the Created 
state to provide greater visibility and indicate that the JobManager is still 
healthy?


> Flink JobManager API non-responsive during job submission
> -
>
> Key: FLINK-8137
> URL: https://issues.apache.org/jira/browse/FLINK-8137
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, JobManager, REST, Webfrontend
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 running a batch job in Kubernetes.
>Reporter: Joshua Griffith
>Priority: Minor
>
> When submitting a new {{FlinkPlan}} using the {{StandaloneClusterClient}}, 
> the JobManager's REST API appears to hang until the job is submitted. The 
> submission time may be large enough to cause timeouts if the 
> {{getStatistics}} and {{createInputSplits}} methods of a job's 
> {{RichInputFormat}}s perform time-intensive tasks like running external 
> queries. This is exacerbated when a job contains many {{RichInputFormat}}s, 
> since they appear to be initialized sequentially. For a particular job with 
> over 100 inputs, it's typical for the API (and consequently the web UI) to be 
> non-responsive for 45–60 seconds.
> Would it make sense for tasks to have a {{Configuring}} state before the 
> {{Created}} state to provide greater visibility and indicate that the 
> JobManager is still healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joshua Griffith updated FLINK-8137:
---
Environment: Flink 1.3.2 running a batch job in Kubernetes.

> Flink JobManager API non-responsive during job submission
> -
>
> Key: FLINK-8137
> URL: https://issues.apache.org/jira/browse/FLINK-8137
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission, JobManager, REST, Webfrontend
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 running a batch job in Kubernetes.
>Reporter: Joshua Griffith
>Priority: Minor
>
> When submitting a new FlinkPlan using the StandaloneClusterClient, the 
> JobManager's REST API appears to hang until the job is submitted. The 
> submission time may be large enough to cause timeouts if the getStatistics 
> and createInputSplits methods of a job's RichInputFormats perform 
> time-intensive tasks like running external queries. This is exacerbated when 
> a job contains many RichInputFormats, since they appear to be initialized 
> sequentially. For a particular job with over 100 inputs, it's typical for the 
> API (and consequently the web UI) to be non-responsive for 45–60 seconds.
> Would it make sense for tasks to have a Configuring state before the Created 
> state to provide greater visibility and indicate that the JobManager is still 
> healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8137) Flink JobManager API non-responsive during job submission

2017-11-22 Thread Joshua Griffith (JIRA)
Joshua Griffith created FLINK-8137:
--

 Summary: Flink JobManager API non-responsive during job submission
 Key: FLINK-8137
 URL: https://issues.apache.org/jira/browse/FLINK-8137
 Project: Flink
  Issue Type: Bug
  Components: Client, Job-Submission, JobManager, REST, Webfrontend
Affects Versions: 1.3.2
Reporter: Joshua Griffith
Priority: Minor


When submitting a new FlinkPlan using the StandaloneClusterClient, the 
JobManager's REST API appears to hang until the job is submitted. The 
submission time may be large enough to cause timeouts if the getStatistics and 
createInputSplits methods of a job's RichInputFormats perform time-intensive 
tasks like running external queries. This is exacerbated when a job contains 
many RichInputFormats, since they appear to be initialized sequentially. For a 
particular job with over 100 inputs, it's typical for the API (and consequently 
the web UI) to be non-responsive for 45–60 seconds.

Would it make sense for tasks to have a Configuring state before the Created 
state to provide greater visibility and indicate that the JobManager is still 
healthy?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262911#comment-16262911
 ] 

ASF GitHub Bot commented on FLINK-8136:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5054
  
+1


> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5054: [FLINK-8136] [table] Fix code generation with JodaTime sh...

2017-11-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5054
  
+1


---


[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262908#comment-16262908
 ] 

ASF GitHub Bot commented on FLINK-8136:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5054

[FLINK-8136] [table] Fix code generation with JodaTime shading

## What is the purpose of the change

Fix `DATE_FORMAT` with shaded JodaTime.


## Brief change log

Do not use hard-coded class name in code generation.


## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8136

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5054


commit f50ae50c8e20ac025088c02aac56452f7013d77b
Author: twalthr 
Date:   2017-11-22T16:43:08Z

[FLINK-8136] [table] Fix code generation with JodaTime shading




> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5054: [FLINK-8136] [table] Fix code generation with Joda...

2017-11-22 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5054

[FLINK-8136] [table] Fix code generation with JodaTime shading

## What is the purpose of the change

Fix `DATE_FORMAT` with shaded JodaTime.


## Brief change log

Do not use hard-coded class name in code generation.


## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8136

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5054.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5054


commit f50ae50c8e20ac025088c02aac56452f7013d77b
Author: twalthr 
Date:   2017-11-22T16:43:08Z

[FLINK-8136] [table] Fix code generation with JodaTime shading




---


[jira] [Commented] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262893#comment-16262893
 ] 

ASF GitHub Bot commented on FLINK-8132:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5053#discussion_r152618042
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link 
FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends Exception {
--- End diff --

Should extend `FlinkException` 


> FlinkKafkaProducer011 can commit incorrect transaction during recovery
> --
>
> Key: FLINK-8132
> URL: https://issues.apache.org/jira/browse/FLINK-8132
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Faulty scenario with producer pool of 2.
> 1. started transaction 1 with producerA, written record 42
> 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
> written record 43
> 3. checkpoint 1 completed, committing txn1, returning producerA to the pool
> 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
> written record 44
> 5. crash
> 6. recover to checkpoint 1, txn1 from producerA found to 
> "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
> 7. unfortunately txn1 and txn3 from the same producers are identical from 
> KafkaBroker perspective and thus txn3 is being committed
> result is that both records 42 and 44 are committed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5053: [FLINK-8132][kafka] Re-initialize transactional Ka...

2017-11-22 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5053#discussion_r152618042
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link 
FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends Exception {
--- End diff --

Should extend `FlinkException` 


---


[jira] [Commented] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262885#comment-16262885
 ] 

Timo Walther commented on FLINK-8136:
-

I will look into the issue.

> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-8136:
---

Assignee: Timo Walther

> Cast exception error on Flink SQL when using DATE_FORMAT
> 
>
> Key: FLINK-8136
> URL: https://issues.apache.org/jira/browse/FLINK-8136
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
> Environment: Any environment
>Reporter: David Marcos
>Assignee: Timo Walther
>Priority: Blocker
>
> Due to the shading of joda time there is a exception when CodeGenerator try 
> to cast org.joda.time.format.DateTimeFormatter to 
> org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter
> This can be reproduce by using DATE_FORMAT temporal function in any flink SQL
> Affected scala file:
> 
> https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
> Affected method:
> --
> addReusableDateFormatter 
> Affected code:
> ---
> val field =
> s"""
> |final org.joda.time.format.DateTimeFormatter $fieldTerm;
> |""".stripMargin
> Fastest solution:
> ---
> val field =
> s"""
> |final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
> $fieldTerm;
> |""".stripMargin



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8136) Cast exception error on Flink SQL when using DATE_FORMAT

2017-11-22 Thread David Marcos (JIRA)
David Marcos created FLINK-8136:
---

 Summary: Cast exception error on Flink SQL when using DATE_FORMAT
 Key: FLINK-8136
 URL: https://issues.apache.org/jira/browse/FLINK-8136
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.4.0
 Environment: Any environment
Reporter: David Marcos
Priority: Blocker


Due to the shading of joda time there is a exception when CodeGenerator try to 
cast org.joda.time.format.DateTimeFormatter to 
org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter

This can be reproduce by using DATE_FORMAT temporal function in any flink SQL

Affected scala file:

https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala

Affected method:
--
addReusableDateFormatter 

Affected code:
---
val field =
s"""
|final org.joda.time.format.DateTimeFormatter $fieldTerm;
|""".stripMargin


Fastest solution:
---
val field =
s"""
|final org.apache.flink.table.shaded.org.joda.time.format.DateTimeFormatter 
$fieldTerm;
|""".stripMargin





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262878#comment-16262878
 ] 

ASF GitHub Bot commented on FLINK-8132:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5053

[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each 
checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with 
producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.

## Verifying this change

This PR adds a test that was previously failing to cover for future 
regressions.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8132

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5053.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5053


commit 6f0f56ca25a12da513253d738d14d8516a808bd8
Author: Piotr Nowojski 
Date:   2017-11-22T10:37:48Z

[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes 
instead of generic Exception

commit b79a372d589df54f505857ed78e120d69d0ad50a
Author: Piotr Nowojski 
Date:   2017-11-22T14:53:08Z

[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each 
checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with 
producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.

commit a5938795c5ae0dd7022d07ee90620fb6a8ce14a9
Author: Piotr Nowojski 
Date:   2017-11-22T14:55:20Z

[hotfix][kafka] Remove unused method in kafka tests




> FlinkKafkaProducer011 can commit incorrect transaction during recovery
> --
>
> Key: FLINK-8132
> URL: https://issues.apache.org/jira/browse/FLINK-8132
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Faulty scenario with producer pool of 2.
> 1. started transaction 1 with producerA, written record 42
> 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
> written record 43
> 3. checkpoint 1 completed, committing txn1, returning producerA to the pool
> 4. checkpoint 2 triggered , committing txn2, started 

[GitHub] flink pull request #5053: [FLINK-8132][kafka] Re-initialize transactional Ka...

2017-11-22 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5053

[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each 
checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with 
producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.

## Verifying this change

This PR adds a test that was previously failing to cover for future 
regressions.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8132

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5053.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5053


commit 6f0f56ca25a12da513253d738d14d8516a808bd8
Author: Piotr Nowojski 
Date:   2017-11-22T10:37:48Z

[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes 
instead of generic Exception

commit b79a372d589df54f505857ed78e120d69d0ad50a
Author: Piotr Nowojski 
Date:   2017-11-22T14:53:08Z

[FLINK-8132][kafka] Re-initialize transactional KafkaProducer on each 
checkpoint

Previously faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with 
producerB, written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed.

With this fix, after re-initialization txn3 will have different 
producerId/epoch counters compared to txn1.

commit a5938795c5ae0dd7022d07ee90620fb6a8ce14a9
Author: Piotr Nowojski 
Date:   2017-11-22T14:55:20Z

[hotfix][kafka] Remove unused method in kafka tests




---


[GitHub] flink pull request #5040: [FLINK-8104][Table API] fixing ROW type value cons...

2017-11-22 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5040#discussion_r152588446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -835,6 +836,43 @@ object ScalarOperators {
 generateUnaryArithmeticOperator(operator, nullCheck, 
operand.resultType, operand)
   }
 
+  def generateRow(
+  codeGenerator: CodeGenerator,
+  resultType: TypeInformation[_],
+  elements: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+val rowTerm = codeGenerator.addReusableRow(resultType.getArity, 
elements.size)
+
+val boxedElements: Seq[GeneratedExpression] = resultType match {
+  case ct: CompositeType[_] =>
+elements.zipWithIndex.map{
+  case (e, idx) =>
+val boxedTypeTerm = boxedTypeTermForTypeInfo(ct.getTypeAt(idx))
--- End diff --

Can we put this into a private helper method? I think it is the same code 
for `generateArray`, `generateMap`, and `generateRow`.


---


[jira] [Commented] (FLINK-8104) Fix Row value constructor

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262870#comment-16262870
 ] 

ASF GitHub Bot commented on FLINK-8104:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5040#discussion_r152588168
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -835,6 +836,43 @@ object ScalarOperators {
 generateUnaryArithmeticOperator(operator, nullCheck, 
operand.resultType, operand)
   }
 
+  def generateRow(
+  codeGenerator: CodeGenerator,
+  resultType: TypeInformation[_],
+  elements: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+val rowTerm = codeGenerator.addReusableRow(resultType.getArity, 
elements.size)
+
+val boxedElements: Seq[GeneratedExpression] = resultType match {
+  case ct: CompositeType[_] =>
--- End diff --

This should always be `RowTypeInfo`. I think we can simply cast it.


> Fix Row value constructor
> -
>
> Key: FLINK-8104
> URL: https://issues.apache.org/jira/browse/FLINK-8104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Support Row value constructor which is currently broken. 
> See 
> {code:java}
> // 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
>   @Test
>   def testValueConstructorFunctions(): Unit = {
> // TODO we need a special code path that flattens ROW types
> // testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
> returns field 0
> // testSqlApi("('hello world', 12)", "hello world") // test base only 
> returns field 0
> // ...
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5040: [FLINK-8104][Table API] fixing ROW type value cons...

2017-11-22 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5040#discussion_r152614003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -417,6 +417,10 @@ object FlinkTypeFactory {
   val compositeRelDataType = 
relDataType.asInstanceOf[CompositeRelDataType]
   compositeRelDataType.compositeType
 
+case ROW if relDataType.isInstanceOf[RelRecordType] =>
+  val relRecordType = relDataType.asInstanceOf[RelRecordType]
+  new RowSchema(relRecordType).typeInfo
+
 // ROW and CURSOR for UDTF case, whose type info will never be used, 
just a placeholder
 case ROW | CURSOR => new NothingTypeInfo
--- End diff --

Maybe we can remove the `ROW` here if it has no impact on the existing 
tests.


---


[jira] [Commented] (FLINK-8104) Fix Row value constructor

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262872#comment-16262872
 ] 

ASF GitHub Bot commented on FLINK-8104:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5040#discussion_r152588446
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -835,6 +836,43 @@ object ScalarOperators {
 generateUnaryArithmeticOperator(operator, nullCheck, 
operand.resultType, operand)
   }
 
+  def generateRow(
+  codeGenerator: CodeGenerator,
+  resultType: TypeInformation[_],
+  elements: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+val rowTerm = codeGenerator.addReusableRow(resultType.getArity, 
elements.size)
+
+val boxedElements: Seq[GeneratedExpression] = resultType match {
+  case ct: CompositeType[_] =>
+elements.zipWithIndex.map{
+  case (e, idx) =>
+val boxedTypeTerm = boxedTypeTermForTypeInfo(ct.getTypeAt(idx))
--- End diff --

Can we put this into a private helper method? I think it is the same code 
for `generateArray`, `generateMap`, and `generateRow`.


> Fix Row value constructor
> -
>
> Key: FLINK-8104
> URL: https://issues.apache.org/jira/browse/FLINK-8104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Support Row value constructor which is currently broken. 
> See 
> {code:java}
> // 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
>   @Test
>   def testValueConstructorFunctions(): Unit = {
> // TODO we need a special code path that flattens ROW types
> // testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
> returns field 0
> // testSqlApi("('hello world', 12)", "hello world") // test base only 
> returns field 0
> // ...
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5040: [FLINK-8104][Table API] fixing ROW type value cons...

2017-11-22 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5040#discussion_r152588168
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -835,6 +836,43 @@ object ScalarOperators {
 generateUnaryArithmeticOperator(operator, nullCheck, 
operand.resultType, operand)
   }
 
+  def generateRow(
+  codeGenerator: CodeGenerator,
+  resultType: TypeInformation[_],
+  elements: Seq[GeneratedExpression])
+  : GeneratedExpression = {
+val rowTerm = codeGenerator.addReusableRow(resultType.getArity, 
elements.size)
+
+val boxedElements: Seq[GeneratedExpression] = resultType match {
+  case ct: CompositeType[_] =>
--- End diff --

This should always be `RowTypeInfo`. I think we can simply cast it.


---


[jira] [Commented] (FLINK-8104) Fix Row value constructor

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262871#comment-16262871
 ] 

ASF GitHub Bot commented on FLINK-8104:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5040#discussion_r152614003
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -417,6 +417,10 @@ object FlinkTypeFactory {
   val compositeRelDataType = 
relDataType.asInstanceOf[CompositeRelDataType]
   compositeRelDataType.compositeType
 
+case ROW if relDataType.isInstanceOf[RelRecordType] =>
+  val relRecordType = relDataType.asInstanceOf[RelRecordType]
+  new RowSchema(relRecordType).typeInfo
+
 // ROW and CURSOR for UDTF case, whose type info will never be used, 
just a placeholder
 case ROW | CURSOR => new NothingTypeInfo
--- End diff --

Maybe we can remove the `ROW` here if it has no impact on the existing 
tests.


> Fix Row value constructor
> -
>
> Key: FLINK-8104
> URL: https://issues.apache.org/jira/browse/FLINK-8104
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> Support Row value constructor which is currently broken. 
> See 
> {code:java}
> // 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
>   @Test
>   def testValueConstructorFunctions(): Unit = {
> // TODO we need a special code path that flattens ROW types
> // testSqlApi("ROW('hello world', 12)", "hello world") // test base only 
> returns field 0
> // testSqlApi("('hello world', 12)", "hello world") // test base only 
> returns field 0
> // ...
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5045#discussion_r152608576
  
--- Diff: docs/concepts/programming-model.md ---
@@ -33,53 +33,52 @@ Flink offers different levels of abstraction to develop 
streaming/batch applicat
 
 
 
-  - The lowest level abstraction simply offers **stateful streaming**. It 
is embedded into the [DataStream API](../dev/datastream_api.html)
-via the [Process 
Function](../dev/stream/operators/process_function.html). It allows users 
freely process events from one or more streams,
-and use consistent fault tolerant *state*. In addition, users can 
register event time and processing time callbacks,
+  - The lowest level abstraction offers **stateful streaming** and is 
embedded into the [DataStream API](../dev/datastream_api.html)
+via the [Process 
Function](../dev/stream/operators/process_function.html). It allows users to 
process events from one or more streams,
+and use consistent fault tolerant *state*. Users can register event 
time and processing time callbacks,
 allowing programs to realize sophisticated computations.
 
-  - In practice, most applications would not need the above described low 
level abstraction, but would instead program against the
+  - In practice, most applications would not need the low level 
abstraction describe above, but would instead program against the
 **Core APIs** like the [DataStream API](../dev/datastream_api.html) 
(bounded/unbounded streams) and the [DataSet API](../dev/batch/index.html)
-(bounded data sets). These fluent APIs offer the common building 
blocks for data processing, like various forms of user-specified
+(bounded data sets). These fluent APIs offer the common building 
blocks for data processing, like forms of user-specified
 transformations, joins, aggregations, windows, state, etc. Data types 
processed in these APIs are represented as classes
-in the respective programming languages.
+in respective programming languages.
 
-The low level *Process Function* integrates with the *DataStream API*, 
making it possible to go the lower level abstraction 
-for certain operations only. The *DataSet API* offers additional 
primitives on bounded data sets, like loops/iterations.
+The low level *Process Function* integrates with the *DataStream API*, 
making it possible to use the lower level abstraction
+for certain operations. The *DataSet API* offers additional primitives 
on bounded data sets, like loops or iterations.
 
   - The **Table API** is a declarative DSL centered around *tables*, which 
may be dynamically changing tables (when representing streams).
-The [Table API](../dev/table_api.html) follows the (extended) 
relational model: Tables have a schema attached (similar to tables in 
relational databases)
+The [Table API](../dev/table_api.html) follows the (extended) 
relational model. Tables have a schema attached (similar to tables in 
relational databases)
 and the API offers comparable operations, such as select, project, 
join, group-by, aggregate, etc.
-Table API programs declaratively define *what logical operation should 
be done* rather than specifying exactly
-   *how the code for the operation looks*. Though the Table API is 
extensible by various types of user-defined
+Table API programs declaratively define *what logical operation should 
to perform* rather than specifying
+   *how the code for the operation looks*. The Table API is extensible by 
various types of user-defined
 functions, it is less expressive than the *Core APIs*, but more 
concise to use (less code to write).
-In addition, Table API programs also go through an optimizer that 
applies optimization rules before execution.
+Table API programs also go through an optimizer that applies 
optimization rules before execution.
 
-One can seamlessly convert between tables and *DataStream*/*DataSet*, 
allowing programs to mix *Table API* and with the *DataStream*
+You can seamlessly convert between tables and *DataStream*/*DataSet*, 
allowing programs to mix *Table API* and with the *DataStream*
 and *DataSet* APIs.
 
   - The highest level abstraction offered by Flink is **SQL**. This 
abstraction is similar to the *Table API* both in semantics and
 expressiveness, but represents programs as SQL query expressions.
-The [SQL](../dev/table_api.html#sql) abstraction closely interacts 
with the Table API, and SQL queries can be executed over tables defined in the 
*Table API*.
+The [SQL](../dev/table_api.html#sql) abstraction closely interacts 
with the Table API, and you can execute SQL queries over tables defined in the 
*Table API*.
 
 
 ## Programs and Dataflows
 
-The 

[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5045#discussion_r152608405
  
--- Diff: docs/concepts/programming-model.md ---
@@ -132,14 +131,13 @@ One typically distinguishes different types of 
windows, such as *tumbling window
 
 
 
-More window examples can be found in this [blog 
post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html).
-More details are in the [window 
docs](../dev/stream/operators/windows.html).
+You can find more window examples in this [blog 
post](https://flink.apache.org/news/2015/12/04/Introducing-windows.html) and in 
the [window documentation](../dev/stream/operators/windows.html).
--- End diff --

Line break before "and".


---


[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5045#discussion_r152607252
  
--- Diff: docs/concepts/programming-model.md ---
@@ -33,53 +33,52 @@ Flink offers different levels of abstraction to develop 
streaming/batch applicat
 
 
 
-  - The lowest level abstraction simply offers **stateful streaming**. It 
is embedded into the [DataStream API](../dev/datastream_api.html)
-via the [Process 
Function](../dev/stream/operators/process_function.html). It allows users 
freely process events from one or more streams,
-and use consistent fault tolerant *state*. In addition, users can 
register event time and processing time callbacks,
+  - The lowest level abstraction offers **stateful streaming** and is 
embedded into the [DataStream API](../dev/datastream_api.html)
+via the [Process 
Function](../dev/stream/operators/process_function.html). It allows users to 
process events from one or more streams,
+and use consistent fault tolerant *state*. Users can register event 
time and processing time callbacks,
 allowing programs to realize sophisticated computations.
 
-  - In practice, most applications would not need the above described low 
level abstraction, but would instead program against the
+  - In practice, most applications would not need the low level 
abstraction describe above, but would instead program against the
 **Core APIs** like the [DataStream API](../dev/datastream_api.html) 
(bounded/unbounded streams) and the [DataSet API](../dev/batch/index.html)
-(bounded data sets). These fluent APIs offer the common building 
blocks for data processing, like various forms of user-specified
+(bounded data sets). These fluent APIs offer the common building 
blocks for data processing, like forms of user-specified
 transformations, joins, aggregations, windows, state, etc. Data types 
processed in these APIs are represented as classes
-in the respective programming languages.
+in respective programming languages.
 
-The low level *Process Function* integrates with the *DataStream API*, 
making it possible to go the lower level abstraction 
-for certain operations only. The *DataSet API* offers additional 
primitives on bounded data sets, like loops/iterations.
+The low level *Process Function* integrates with the *DataStream API*, 
making it possible to use the lower level abstraction
+for certain operations. The *DataSet API* offers additional primitives 
on bounded data sets, like loops or iterations.
 
   - The **Table API** is a declarative DSL centered around *tables*, which 
may be dynamically changing tables (when representing streams).
-The [Table API](../dev/table_api.html) follows the (extended) 
relational model: Tables have a schema attached (similar to tables in 
relational databases)
+The [Table API](../dev/table_api.html) follows the (extended) 
relational model. Tables have a schema attached (similar to tables in 
relational databases)
 and the API offers comparable operations, such as select, project, 
join, group-by, aggregate, etc.
-Table API programs declaratively define *what logical operation should 
be done* rather than specifying exactly
-   *how the code for the operation looks*. Though the Table API is 
extensible by various types of user-defined
+Table API programs declaratively define *what logical operation should 
to perform* rather than specifying
+   *how the code for the operation looks*. The Table API is extensible by 
various types of user-defined
 functions, it is less expressive than the *Core APIs*, but more 
concise to use (less code to write).
--- End diff --

"functions, it" -> "functions. It"


---


[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5045#discussion_r152612059
  
--- Diff: docs/concepts/runtime.md ---
@@ -107,21 +107,20 @@ With hyper-threading, each slot then takes 2 or more 
hardware thread contexts.
 
 ## State Backends
 
-The exact data structures in which the key/values indexes are stored 
depends on the chosen [state backend](../ops/state/state_backends.html). One 
state backend
+The exact data structures in which the key/values indexes are stored 
depends on the [state backend](../ops/state/state_backends.html) you choose. 
One state backend
 stores data in an in-memory hash map, another state backend uses 
[RocksDB](http://rocksdb.org) as the key/value store.
-In addition to defining the data structure that holds the state, the state 
backends also implement the logic to
-take a point-in-time snapshot of the key/value state and store that 
snapshot as part of a checkpoint.
+The state backends also implement the logic to take a point-in-time 
snapshot of the key/value state and store that snapshot as part of a checkpoint.
 
 
 
 {% top %}
 
 ## Savepoints
 
-Programs written in the Data Stream API can resume execution from a 
**savepoint**. Savepoints allow both updating your programs and your Flink 
cluster without losing any state. 
+Programs written in the Data Stream API can resume execution from a 
**savepoint**. Savepoints allow both updating your programs and your Flink 
cluster without losing any state.
 
-[Savepoints](../ops/state/savepoints.html) are **manually triggered 
checkpoints**, which take a snapshot of the program and write it out to a state 
backend. They rely on the regular checkpointing mechanism for this. During 
execution programs are periodically snapshotted on the worker nodes and produce 
checkpoints. For recovery only the last completed checkpoint is needed and 
older checkpoints can be safely discarded as soon as a new one is completed.
+[Savepoints](../ops/state/savepoints.html) are **manually triggered 
checkpoints**, which take a snapshot of the program and write it out to a state 
backend. They rely on the regular checkpointing mechanism for this. During 
execution programs are periodically snapshotted on the worker nodes and produce 
checkpoints. The last completed checkpoint is only needed for recovery and you 
can safely discard older checkpoints as soon as a new one is completed.
--- End diff --

Should we leave the original but change "can be safely" to "are" or "are by 
default"? The paragraph starts discussing savepoints but then switches to 
commenting on checkpoints with no distinction about externalized checkpoints.


---


[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5045#discussion_r152605878
  
--- Diff: docs/concepts/programming-model.md ---
@@ -33,53 +33,52 @@ Flink offers different levels of abstraction to develop 
streaming/batch applicat
 
 
 
-  - The lowest level abstraction simply offers **stateful streaming**. It 
is embedded into the [DataStream API](../dev/datastream_api.html)
-via the [Process 
Function](../dev/stream/operators/process_function.html). It allows users 
freely process events from one or more streams,
-and use consistent fault tolerant *state*. In addition, users can 
register event time and processing time callbacks,
+  - The lowest level abstraction offers **stateful streaming** and is 
embedded into the [DataStream API](../dev/datastream_api.html)
+via the [Process 
Function](../dev/stream/operators/process_function.html). It allows users to 
process events from one or more streams,
+and use consistent fault tolerant *state*. Users can register event 
time and processing time callbacks,
 allowing programs to realize sophisticated computations.
 
-  - In practice, most applications would not need the above described low 
level abstraction, but would instead program against the
+  - In practice, most applications would not need the low level 
abstraction describe above, but would instead program against the
--- End diff --

"describe" -> "described"


---


[GitHub] flink pull request #5045: [hotfix][docs] Review of concepts docs for grammar...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5045#discussion_r152609451
  
--- Diff: docs/concepts/runtime.md ---
@@ -74,10 +74,10 @@ To control how many tasks a worker accepts, a worker 
has so called **task slots*
 Each *task slot* represents a fixed subset of resources of the 
TaskManager. A TaskManager with three slots, for example,
 will dedicate 1/3 of its managed memory to each slot. Slotting the 
resources means that a subtask will not
 compete with subtasks from other jobs for managed memory, but instead has 
a certain amount of reserved
-managed memory. Note that no CPU isolation happens here; currently slots 
only separate the managed memory of tasks.
+managed memory. No CPU isolation happens, slots only separate the managed 
memory of tasks.
--- End diff --

"," -> ";"?


---


[jira] [Updated] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery

2017-11-22 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8132:
--
Description: 
Faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed. 

  was:
Faulty scenario with producer pool of 2.

1. started transaction 1 with producerA, written record 42
2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
written record 43
3. checkpoint 1 completed, committing txn1, returning producerA to the pool
4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
written record 44
5. crash
6. recover to checkpoint 1, txn1 from producerA found to 
"pendingCommitTransactions", attempting to recoverAndCommit(txn1)
7. unfortunately txn1 and txn3 from the same producers are identical from 
KafkaBroker perspective and thus txn3 is being committed

result is that both records 42 and 44 are committed. 

Proposed solution is to postpone returning producers to the pool until we are 
sure that previous checkpoint (for which given producer was used) will not be 
used for recovery (at least one more checkpoint was completed).


> FlinkKafkaProducer011 can commit incorrect transaction during recovery
> --
>
> Key: FLINK-8132
> URL: https://issues.apache.org/jira/browse/FLINK-8132
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Faulty scenario with producer pool of 2.
> 1. started transaction 1 with producerA, written record 42
> 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
> written record 43
> 3. checkpoint 1 completed, committing txn1, returning producerA to the pool
> 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
> written record 44
> 5. crash
> 6. recover to checkpoint 1, txn1 from producerA found to 
> "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
> 7. unfortunately txn1 and txn3 from the same producers are identical from 
> KafkaBroker perspective and thus txn3 is being committed
> result is that both records 42 and 44 are committed. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262841#comment-16262841
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user taizilongxu commented on the issue:

https://github.com/apache/flink/pull/5050
  
the prefix is the same as kafka high level api, contain the uuid string, so 
it could be unique


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5050: [FLINK-4822] Ensure that the Kafka 0.8 connector is compa...

2017-11-22 Thread taizilongxu
Github user taizilongxu commented on the issue:

https://github.com/apache/flink/pull/5050
  
the prefix is the same as kafka high level api, contain the uuid string, so 
it could be unique


---


[GitHub] flink issue #5047: Code refine of WordWithCount

2017-11-22 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5047
  
This change doesn't work since `WordWithCount` needs to be a POJO which 
requires a default or no-args constructor and non-final public attributes or 
getters/setters. Since the type is not a POJO the `TypeExtractor` falls back to 
`GenericType`, which cannot be used as a key type as noted in the error message:


Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.949 sec 
<<< FAILURE! - in 
org.apache.flink.streaming.test.socket.SocketWindowWordCountITCase

testJavaProgram(org.apache.flink.streaming.test.socket.SocketWindowWordCountITCase)
  Time elapsed: 0.045 sec  <<< ERROR!
org.apache.flink.api.common.InvalidProgramException: This type 
(GenericType)
 cannot be used as key.
at 
org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:330)
at 
org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:295)
at 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:79)
at 
org.apache.flink.streaming.test.socket.SocketWindowWordCountITCase.testJavaProgram(SocketWindowWordCountITCase.java:65)



---


[jira] [Commented] (FLINK-8081) Annotate MetricRegistry#getReporters() with @VisibleForTesting

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262785#comment-16262785
 ] 

ASF GitHub Bot commented on FLINK-8081:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5049
  
+1


> Annotate MetricRegistry#getReporters() with @VisibleForTesting
> --
>
> Key: FLINK-8081
> URL: https://issues.apache.org/jira/browse/FLINK-8081
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Tests
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {{MetricRegistry#getReporters()}} is only used for testing purposes to 
> provide access to instantiated reporters. We should annotate this method with 
> {{@VisibleForTesting}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5049: [FLINK-8081][metrics] Annotate 'MetricRegistry#getReporte...

2017-11-22 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5049
  
+1


---


[jira] [Commented] (FLINK-8070) YarnTestBase should print prohibited string

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262775#comment-16262775
 ] 

ASF GitHub Bot commented on FLINK-8070:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5012
  
Ah, thanks @zentol for the clarification. Very nice to have this!


> YarnTestBase should print prohibited string
> ---
>
> Key: FLINK-8070
> URL: https://issues.apache.org/jira/browse/FLINK-8070
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, YARN
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.4.0
>
>
> The yarn tests check the log files for a set of prohibited strings. If found, 
> the entire log file is logged as WARN, the offending line is logged as ERROR, 
> and the test fails with this unhelpful message:
> {code}
> java.lang.AssertionError(Found a file 
> /home/travis/build/apache/flink/flink-yarn-tests/target/flink-yarn-tests-fifo/flink-yarn-tests-fifo-logDir-nm-1_0/application_1510164935122_0002/container_1510164935122_0002_01_01/jobmanager.log
>  with a prohibited string: [Exception, Started 
> SelectChannelConnector@0.0.0.0:8081])
> {code}
> If you don't have log access on travis you have thus no knowledge what 
> actually went wrong.
> I propose to also print smaller excerpts around the found error (like 10 
> lines or smth) in the Assert.fail message.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5012: [FLINK-8070][yarn][tests] Print errors found in log files

2017-11-22 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5012
  
Ah, thanks @zentol for the clarification. Very nice to have this!


---


[jira] [Closed] (FLINK-7841) Add docs for Flink's S3 support

2017-11-22 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7841.
---
Resolution: Fixed

> Add docs for Flink's S3 support
> ---
>
> Key: FLINK-7841
> URL: https://issues.apache.org/jira/browse/FLINK-7841
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Nico Kruber
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5006: [hotfix][docs][QS] MInor cleanup of QS documentati...

2017-11-22 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5006#discussion_r152591791
  
--- Diff: docs/dev/stream/state/queryable_state.md ---
@@ -162,14 +161,19 @@ So far, you have set up your cluster to run with 
queryable state and you have de
 queryable. Now it is time to see how to query this state. 
 
 For this you can use the `QueryableStateClient` helper class. This is 
available in the `flink-queryable-state-client` 
-jar which you have to explicitly include as a dependency in the `pom.xml` 
of your project, as shown below:
+jar which must explicitly included as a dependency in the `pom.xml` of 
your project along with `flink-core`, as shown below:
--- End diff --

"must" -> "must be"


---


[jira] [Commented] (FLINK-7574) flink-clients

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262746#comment-16262746
 ] 

ASF GitHub Bot commented on FLINK-7574:
---

Github user yew1eb closed the pull request at:

https://github.com/apache/flink/pull/4712


> flink-clients
> -
>
> Key: FLINK-7574
> URL: https://issues.apache.org/jira/browse/FLINK-7574
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
> Environment: Apache Maven 3.3.9, Java version: 1.8.0_144
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>
> [INFO] --- maven-dependency-plugin:2.10:analyze (default-cli) @ 
> flink-clients_2.11 ---
> [WARNING] Used undeclared dependencies found:
> [WARNING]org.scala-lang:scala-library:jar:2.11.11:compile
> [WARNING]com.data-artisans:flakka-actor_2.11:jar:2.3-custom:compile
> [WARNING] Unused declared dependencies found:
> [WARNING]org.hamcrest:hamcrest-all:jar:1.3:test
> [WARNING]org.apache.flink:force-shading:jar:1.4-SNAPSHOT:compile
> [WARNING]org.powermock:powermock-module-junit4:jar:1.6.5:test
> [WARNING]com.google.code.findbugs:jsr305:jar:1.3.9:compile
> [WARNING]log4j:log4j:jar:1.2.17:test
> [WARNING]org.powermock:powermock-api-mockito:jar:1.6.5:test
> [WARNING]org.slf4j:slf4j-log4j12:jar:1.7.7:test



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4716) Add trigger full recovery button to web UI

2017-11-22 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262747#comment-16262747
 ] 

Aljoscha Krettek commented on FLINK-4716:
-

What's the state of this? And also: what's the purpose of this?

> Add trigger full recovery button to web UI
> --
>
> Key: FLINK-4716
> URL: https://issues.apache.org/jira/browse/FLINK-4716
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Till Rohrmann
>Priority: Minor
>
> Add a trigger full recovery button to the web UI. The full recovery button 
> will take the latest completed checkpoint and resume from there.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4712: [FLINK-7574][build] POM Cleanup flink-clients

2017-11-22 Thread yew1eb
Github user yew1eb closed the pull request at:

https://github.com/apache/flink/pull/4712


---


[jira] [Commented] (FLINK-4822) Ensure that the Kafka 0.8 connector is compatible with kafka-consumer-groups.sh

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262720#comment-16262720
 ] 

ASF GitHub Bot commented on FLINK-4822:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5050
  
Don't you need a unique ID for this to work properly? (The task name is not 
unique)


> Ensure that the Kafka 0.8 connector is compatible with 
> kafka-consumer-groups.sh
> ---
>
> Key: FLINK-4822
> URL: https://issues.apache.org/jira/browse/FLINK-4822
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>
> The Kafka 0.8 connector is not properly creating all datastructures in 
> Zookeeper for Kafka's {{kafka-consumer-groups.sh}} tool.
> A user reported the issue here: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-connector08-not-updating-the-offsets-with-the-zookeeper-td9469.html#a9498
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5050: [FLINK-4822] Ensure that the Kafka 0.8 connector is compa...

2017-11-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5050
  
Don't you need a unique ID for this to work properly? (The task name is not 
unique)


---


[GitHub] flink pull request #5051: [hotfix] [javadocs] Fixed typo in Trigger doc

2017-11-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5051


---


[GitHub] flink issue #5051: [hotfix] [javadocs] Fixed typo in Trigger doc

2017-11-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5051
  
+1, merging.


---


[GitHub] flink issue #5052: [FLINK-8133][REST][docs] Generate REST API documentation

2017-11-22 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5052
  
Thanks @zentol I will have a look.


---


[jira] [Commented] (FLINK-8133) Generate documentation for new REST API

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262716#comment-16262716
 ] 

ASF GitHub Bot commented on FLINK-8133:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5052
  
Thanks @zentol I will have a look.


> Generate documentation for new REST API
> ---
>
> Key: FLINK-8133
> URL: https://issues.apache.org/jira/browse/FLINK-8133
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8133) Generate documentation for new REST API

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262704#comment-16262704
 ] 

ASF GitHub Bot commented on FLINK-8133:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5052

[FLINK-8133][REST][docs] Generate REST API documentation

## What is the purpose of the change

This PR adds a generator for the REST API documentation. The generated has 
to be run manually (automatic integration is a bit tricky, in particular 
detecting changes & failing the build).

The generator can be called by either specifying either 
`-Pgenerate-rest-docs` or `-Dgenerate-rest-docs` when building `flink-docs`.

The generated content was added to the REST API documentation 
(`/monitoring/rest_api.html`).

Here's a preview of the result:

![snippet](https://user-images.githubusercontent.com/5725237/33132910-522eb2d2-cf9b-11e7-844c-613fdcd818c8.PNG)

## Brief change log

* modify URL comparator in RestServerEndpoint to be public
* add a new module `flink-docs` (to not pollute other modules with 
dependencies)
* implement generator
* generated dispatcher REST documentation
* integrated generated content into rest_api.md

## Verifying this change

Needs manual verification.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8133

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5052.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5052






> Generate documentation for new REST API
> ---
>
> Key: FLINK-8133
> URL: https://issues.apache.org/jira/browse/FLINK-8133
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5052: [FLINK-8133][REST][docs] Generate REST API documen...

2017-11-22 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5052

[FLINK-8133][REST][docs] Generate REST API documentation

## What is the purpose of the change

This PR adds a generator for the REST API documentation. The generated has 
to be run manually (automatic integration is a bit tricky, in particular 
detecting changes & failing the build).

The generator can be called by either specifying either 
`-Pgenerate-rest-docs` or `-Dgenerate-rest-docs` when building `flink-docs`.

The generated content was added to the REST API documentation 
(`/monitoring/rest_api.html`).

Here's a preview of the result:

![snippet](https://user-images.githubusercontent.com/5725237/33132910-522eb2d2-cf9b-11e7-844c-613fdcd818c8.PNG)

## Brief change log

* modify URL comparator in RestServerEndpoint to be public
* add a new module `flink-docs` (to not pollute other modules with 
dependencies)
* implement generator
* generated dispatcher REST documentation
* integrated generated content into rest_api.md

## Verifying this change

Needs manual verification.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8133

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5052.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5052






---


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262597#comment-16262597
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152580041
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+  

[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152580041
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---
@@ -0,0 +1,1511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import org.apache.orc.TypeDescription;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.function.DoubleFunction;
+import java.util.function.IntFunction;
+import java.util.function.LongFunction;
+
+/**
+ * A class that provides utility methods for orc file reading.
+ */
+class OrcUtils {
+
+   private static final long MILLIS_PER_DAY = 8640; // = 24 * 60 * 60 
* 1000
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Converts an ORC schema to a Flink TypeInformation.
+*
+* @param schema The ORC schema.
+* @return The TypeInformation that corresponds to the ORC schema.
+*/
+   static TypeInformation schemaToTypeInfo(TypeDescription schema) {
+   switch (schema.getCategory()) {
+   case BOOLEAN:
+   return BasicTypeInfo.BOOLEAN_TYPE_INFO;
+   case BYTE:
+   return BasicTypeInfo.BYTE_TYPE_INFO;
+   case SHORT:
+   return BasicTypeInfo.SHORT_TYPE_INFO;
+   case INT:
+   return BasicTypeInfo.INT_TYPE_INFO;
+   case LONG:
+   return BasicTypeInfo.LONG_TYPE_INFO;
+   case FLOAT:
+   return BasicTypeInfo.FLOAT_TYPE_INFO;
+   case DOUBLE:
+   return BasicTypeInfo.DOUBLE_TYPE_INFO;
+   case DECIMAL:
+   return BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   case STRING:
+   case CHAR:
+   case VARCHAR:
+   return BasicTypeInfo.STRING_TYPE_INFO;
+   case DATE:
+   return SqlTimeTypeInfo.DATE;
+   case TIMESTAMP:
+   return SqlTimeTypeInfo.TIMESTAMP;
+   case BINARY:
+   return 
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+   case STRUCT:
+  

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262593#comment-16262593
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152579106
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);

[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152579106
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
+   // the schema of the ORC files to read
+   private TypeDescription schema;
+
+   // the fields of the ORC schema that the returned Rows are composed of.
+   private int[] selectedFields;
+   // the type information of the Rows returned by this InputFormat.
+   private transient RowTypeInfo rowType;
+
+   // the ORC reader
+   private transient RecordReader orcRowsReader;
+   // the vectorized row data to be read in a batch
+   private transient VectorizedRowBatch rowBatch;
+   // the vector of rows that is read in a batch
+   private transient Row[] rows;
+
+   // the number of rows in the current batch
+   private transient int rowsInBatch;
+   // the index of the next row to return
+   private transient int nextRow;
+
+   private ArrayList conjunctPredicates = new ArrayList<>();
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The configuration to read the ORC files with.
+*/
+   public OrcRowInputFormat(String path, String schemaString, 
Configuration orcConfig) {
+   this(path, TypeDescription.fromString(schemaString), orcConfig, 
DEFAULT_BATCH_SIZE);
+   }
+
+   /**
+* Creates an OrcRowInputFormat.
+*
+* @param path The path to read ORC files from.
+* @param schemaString The schema of the ORC files as String.
+* @param orcConfig The 

[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262591#comment-16262591
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152578874
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
--- End diff --

Ok, sorry, I didn't check for custom serialization.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152578874
  
--- Diff: 
flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcRowInputFormat.java
 ---
@@ -0,0 +1,747 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.orc.OrcUtils.fillRows;
+
+/**
+ * InputFormat to read ORC files.
+ */
+public class OrcRowInputFormat extends FileInputFormat implements 
ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OrcRowInputFormat.class);
+   // the number of rows read in a batch
+   private static final int DEFAULT_BATCH_SIZE = 1000;
+
+   // the number of fields rows to read in a batch
+   private int batchSize;
+   // the configuration to read with
+   private Configuration conf;
--- End diff --

Ok, sorry, I didn't check for custom serialization.


---


[jira] [Comment Edited] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()

2017-11-22 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262584#comment-16262584
 ] 

Andrey edited comment on FLINK-5465 at 11/22/17 2:19 PM:
-

We were able to reproduce this issue on Flink 1.3.2. This is very critical bug, 
because failing job could bring whole flink cluster down. 

Failing job will amplify chances for rocksdb crash by constant start/cancel.


was (Author: dernasherbrezon):
We were able to reproduce this issue on Flink 1.3.2. This is very critical bug, 
because failing job could bring whole flink cluster down. 

Failing job amplifying chances for rocksdb crash by constant start/cancel.

> RocksDB fails with segfault while calling AbstractRocksDBState.clear()
> --
>
> Key: FLINK-5465
> URL: https://issues.apache.org/jira/browse/FLINK-5465
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
> Attachments: hs-err-pid26662.log
>
>
> I'm using Flink 699f4b0.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576
> #
> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 
> 1.7.0_67-b01)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [librocksdbjni-linux64.so+0x1aeb78]  
> rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # 
> /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log
> Compiled method (nm) 1869778  903 n   org.rocksdb.RocksDB::remove 
> (native)
>  total in heap  [0x7f91b40b9dd0,0x7f91b40ba150] = 896
>  relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88
>  main code  [0x7f91b40b9f60,0x7f91b40ba150] = 496
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.sun.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5465) RocksDB fails with segfault while calling AbstractRocksDBState.clear()

2017-11-22 Thread Andrey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262584#comment-16262584
 ] 

Andrey commented on FLINK-5465:
---

We were able to reproduce this issue on Flink 1.3.2. This is very critical bug, 
because failing job could bring whole flink cluster down. 

Failing job amplifying chances for rocksdb crash by constant start/cancel.

> RocksDB fails with segfault while calling AbstractRocksDBState.clear()
> --
>
> Key: FLINK-5465
> URL: https://issues.apache.org/jira/browse/FLINK-5465
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
> Attachments: hs-err-pid26662.log
>
>
> I'm using Flink 699f4b0.
> {code}
> #
> # A fatal error has been detected by the Java Runtime Environment:
> #
> #  SIGSEGV (0xb) at pc=0x7f91a0d49b78, pid=26662, tid=140263356024576
> #
> # JRE version: Java(TM) SE Runtime Environment (7.0_67-b01) (build 
> 1.7.0_67-b01)
> # Java VM: Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode 
> linux-amd64 compressed oops)
> # Problematic frame:
> # C  [librocksdbjni-linux64.so+0x1aeb78]  
> rocksdb::GetColumnFamilyID(rocksdb::ColumnFamilyHandle*)+0x8
> #
> # Failed to write core dump. Core dumps have been disabled. To enable core 
> dumping, try "ulimit -c unlimited" before starting Java again
> #
> # An error report file with more information is saved as:
> # 
> /yarn/nm/usercache/robert/appcache/application_1484132267957_0007/container_1484132267957_0007_01_10/hs_err_pid26662.log
> Compiled method (nm) 1869778  903 n   org.rocksdb.RocksDB::remove 
> (native)
>  total in heap  [0x7f91b40b9dd0,0x7f91b40ba150] = 896
>  relocation [0x7f91b40b9ef0,0x7f91b40b9f48] = 88
>  main code  [0x7f91b40b9f60,0x7f91b40ba150] = 496
> #
> # If you would like to submit a bug report, please visit:
> #   http://bugreport.sun.com/bugreport/crash.jsp
> # The crash happened outside the Java Virtual Machine in native code.
> # See problematic frame for where to report the bug.
> #
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262583#comment-16262583
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152577385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan(
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+
+val adjustedCnt: Double = tableSource match {
+  case f: FilterableTableSource[_] if f.isFilterPushedDown =>
+// ensure we prefer FilterableTableSources with pushed-down 
filters.
+rowCnt - 1.0
--- End diff --

Doesn't really make a difference IMO. It's all about relative costs. We 
only need to make sure that a `FilterableTableSource` with pushed down 
predicates appears to be less expensive than the same `TableSource` without 
pushed predicates.

The problem has not occurred before, because the `OrcTableSource` does not 
guarantee that all emitted rows match the pushed predicates. Therefore, all 
predicates are also applied by a following `Calc` operator such that the cost 
of that operator is not decreased.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152577385
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
 ---
@@ -62,7 +62,19 @@ class FlinkLogicalTableSourceScan(
 
   override def computeSelfCost(planner: RelOptPlanner, metadata: 
RelMetadataQuery): RelOptCost = {
 val rowCnt = metadata.getRowCount(this)
-planner.getCostFactory.makeCost(rowCnt, rowCnt, rowCnt * 
estimateRowSize(getRowType))
+
+val adjustedCnt: Double = tableSource match {
+  case f: FilterableTableSource[_] if f.isFilterPushedDown =>
+// ensure we prefer FilterableTableSources with pushed-down 
filters.
+rowCnt - 1.0
--- End diff --

Doesn't really make a difference IMO. It's all about relative costs. We 
only need to make sure that a `FilterableTableSource` with pushed down 
predicates appears to be less expensive than the same `TableSource` without 
pushed predicates.

The problem has not occurred before, because the `OrcTableSource` does not 
guarantee that all emitted rows match the pushed predicates. Therefore, all 
predicates are also applied by a following `Calc` operator such that the cost 
of that operator is not decreased.


---


[jira] [Commented] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery

2017-11-22 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262565#comment-16262565
 ] 

Kostas Kloudas commented on FLINK-8132:
---

This is a bug or a blocker?

> FlinkKafkaProducer011 can commit incorrect transaction during recovery
> --
>
> Key: FLINK-8132
> URL: https://issues.apache.org/jira/browse/FLINK-8132
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Faulty scenario with producer pool of 2.
> 1. started transaction 1 with producerA, written record 42
> 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
> written record 43
> 3. checkpoint 1 completed, committing txn1, returning producerA to the pool
> 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
> written record 44
> 5. crash
> 6. recover to checkpoint 1, txn1 from producerA found to 
> "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
> 7. unfortunately txn1 and txn3 from the same producers are identical from 
> KafkaBroker perspective and thus txn3 is being committed
> result is that both records 42 and 44 are committed. 
> Proposed solution is to postpone returning producers to the pool until we are 
> sure that previous checkpoint (for which given producer was used) will not be 
> used for recovery (at least one more checkpoint was completed).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-3924) Remove protobuf shading from Kinesis connector

2017-11-22 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262564#comment-16262564
 ] 

Robert Metzger commented on FLINK-3924:
---

I think this will be a problem till Kinesis/Amazon solves the problem.
But I haven't checked recently if they changed something.

> Remove protobuf shading from Kinesis connector
> --
>
> Key: FLINK-3924
> URL: https://issues.apache.org/jira/browse/FLINK-3924
> Project: Flink
>  Issue Type: Task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Robert Metzger
>
> The Kinesis connector is currently creating a fat jar with a custom protobuf 
> version (2.6.1), relocated into a different package.
> We need to build the fat jar to change the protobuf calls from the original 
> protobuf to the relocated one.
> Because Kinesis is licensed under the Amazon Software License (which is not 
> entirely to the ASL2.0), I don't want to deploy kinesis connector binaries to 
> maven central with the releases. These binaries would contain code from 
> Amazon. It would be more than just linking to an (optional) dependencies.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8118) Allow to specify the offsets of KafkaTableSources

2017-11-22 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262563#comment-16262563
 ] 

Xingcan Cui commented on FLINK-8118:


Hi [~fhueske], thanks for the suggestions! 

Since the existing configurations in {{KafkaTableSource.Builder}} are all about 
the {{TableSource}} itself while the starting offsets are set for the inner 
{{FlinkKafkaConsumerBase}}, the code may be a little verbose. Anyway, I'll 
create a PR soon and let's discuss that later.

Thanks, Xingcan

> Allow to specify the offsets of KafkaTableSources
> -
>
> Key: FLINK-8118
> URL: https://issues.apache.org/jira/browse/FLINK-8118
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Right now the Kafka TableSources can only read from the current group offset. 
> We should expose the possibilities of the Kafka Consumer:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-start-position-configuration



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (FLINK-8132) FlinkKafkaProducer011 can commit incorrect transaction during recovery

2017-11-22 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-8132:
--
Comment: was deleted

(was: This is a bug or a blocker?)

> FlinkKafkaProducer011 can commit incorrect transaction during recovery
> --
>
> Key: FLINK-8132
> URL: https://issues.apache.org/jira/browse/FLINK-8132
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Faulty scenario with producer pool of 2.
> 1. started transaction 1 with producerA, written record 42
> 2. checkpoint 1 triggered, pre committing txn1, started txn2 with producerB, 
> written record 43
> 3. checkpoint 1 completed, committing txn1, returning producerA to the pool
> 4. checkpoint 2 triggered , committing txn2, started txn3 with producerA, 
> written record 44
> 5. crash
> 6. recover to checkpoint 1, txn1 from producerA found to 
> "pendingCommitTransactions", attempting to recoverAndCommit(txn1)
> 7. unfortunately txn1 and txn3 from the same producers are identical from 
> KafkaBroker perspective and thus txn3 is being committed
> result is that both records 42 and 44 are committed. 
> Proposed solution is to postpone returning producers to the pool until we are 
> sure that previous checkpoint (for which given producer was used) will not be 
> used for recovery (at least one more checkpoint was completed).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2170) Add OrcTableSource

2017-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262546#comment-16262546
 ] 

ASF GitHub Bot commented on FLINK-2170:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152571674
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -355,4 +355,21 @@ public void addComparatorField(int fieldId, 
TypeComparator comparator) {
comparatorOrders);
}
}
+
+   /**
+* Creates a {@link RowTypeInfo} with projected fields.
+*
+* @param rowType The original RowTypeInfo whose fields are projected
+* @param fieldMapping The field mapping of the projection
+* @return A RowTypeInfo with projected fields.
+*/
+   public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] 
fieldMapping) {
--- End diff --

I think it is cleaner to have this as a static method than an instance 
method. 
A static method makes it explicit that this creates a new (immutable) 
`RowTypeInfo`.


> Add OrcTableSource
> --
>
> Key: FLINK-2170
> URL: https://issues.apache.org/jira/browse/FLINK-2170
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: Usman Younas
>Priority: Minor
>  Labels: starter
>
> Add a {{OrcTableSource}} to read data from an ORC file. The 
> {{OrcTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5043: [FLINK-2170] [connectors] Add OrcRowInputFormat an...

2017-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5043#discussion_r152571674
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/RowTypeInfo.java 
---
@@ -355,4 +355,21 @@ public void addComparatorField(int fieldId, 
TypeComparator comparator) {
comparatorOrders);
}
}
+
+   /**
+* Creates a {@link RowTypeInfo} with projected fields.
+*
+* @param rowType The original RowTypeInfo whose fields are projected
+* @param fieldMapping The field mapping of the projection
+* @return A RowTypeInfo with projected fields.
+*/
+   public static RowTypeInfo projectFields(RowTypeInfo rowType, int[] 
fieldMapping) {
--- End diff --

I think it is cleaner to have this as a static method than an instance 
method. 
A static method makes it explicit that this creates a new (immutable) 
`RowTypeInfo`.


---


[jira] [Created] (FLINK-8135) Add description to MessageParameter

2017-11-22 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-8135:
---

 Summary: Add description to MessageParameter
 Key: FLINK-8135
 URL: https://issues.apache.org/jira/browse/FLINK-8135
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, REST
Reporter: Chesnay Schepler
 Fix For: 1.5.0


For documentation purposes we should add an {{getDescription()}} method to the 
{{MessageParameter}} class, describing what this particular parameter is used 
for and which values are accepted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6101) GroupBy fields with arithmetic expression (include UDF) can not be selected

2017-11-22 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16262540#comment-16262540
 ] 

Fabian Hueske commented on FLINK-6101:
--

You are right that SQL doesn't support `AS`. But also SQL is not a perfect 
language and has shortcomings.
IMO, the Table API should follow SQL semantics and where it applies also SQL 
syntax. 

Having optional support for `AS` in `groupBy()` would not be a problem in that 
regard because it does not change the semantics and is a nice shortcut.


> GroupBy fields with arithmetic expression (include UDF) can not be selected
> ---
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Minor
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> (BTW, this syntax is invalid in RDBMS which will indicate the selected column 
> is invalid in the select list because it is not contained in either an 
> aggregate function or the GROUP BY clause in SQL Server.)
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will also cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias in groupBy clause "group(e, 'b%3 as 'b)" work without avail. 
> and apply an UDF doesn’t work either
> {code}
>table.groupBy('a, Mod('b, 3)).select('a, Mod('b, 3), 'c.count, 'c.count, 
> 'd.count, 'e.avg)
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [a, org.apache.flink.table.api.scala.batch.table.Mod$('b, 3), TMP_0, 
> TMP_1, TMP_2].
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> One way to solve this is to add support alias in groupBy clause ( it seems a 
> bit odd against SQL though TableAPI has a different groupBy grammar),  
> and I prefer to support select original expressions and UDF in groupBy 
> clause(make consistent with SQL).
> as thus:
> {code}
> // use expression
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b % 3, 'c.min, 'e, 'a.avg, 'd.count)
> // use UDF
> val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, Mod('b,3))
> .select(Mod('b,3), 'c.min, 'e, 'a.avg, 'd.count)
> {code}

> After had a look into the code, found there was a problem in the groupBy 
> implementation, validation hadn't considered the expressions in groupBy 
> clause. it should be noted that a table has been actually changed after 
> groupBy operation ( a new Table) and the groupBy keys replace the original 
> field reference in essence.
>  
> What do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8134) Add description to MessageHeaders

2017-11-22 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-8134:
---

 Summary: Add description to MessageHeaders
 Key: FLINK-8134
 URL: https://issues.apache.org/jira/browse/FLINK-8134
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, REST
Reporter: Chesnay Schepler
 Fix For: 1.5.0


For documentation purposes we should add an {{getDescription()}} method to the 
{{MessageHeaders}} interface, describing what particular action this REST call 
executes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >