[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15774190#comment-15774190
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

I've updated the PR according to your comments. Could you please review it 
again?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2016-12-23 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

I've updated the PR according to your comments. Could you please review it 
again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5391) Unprotected access to shutdown in AbstractNonHaServices#checkNotShutdown()

2016-12-23 Thread Ted Yu (JIRA)
Ted Yu created FLINK-5391:
-

 Summary: Unprotected access to shutdown in 
AbstractNonHaServices#checkNotShutdown()
 Key: FLINK-5391
 URL: https://issues.apache.org/jira/browse/FLINK-5391
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
  private void checkNotShutdown() {
checkState(!shutdown, "high availability services are shut down");
{code}
Access to shutdown is protected by lock in other places.
The code above should protect with lock as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5390) input should be closed in finally block in YarnFlinkApplicationMasterRunner#loadJobGraph()

2016-12-23 Thread Ted Yu (JIRA)
Ted Yu created FLINK-5390:
-

 Summary: input should be closed in finally block in 
YarnFlinkApplicationMasterRunner#loadJobGraph()
 Key: FLINK-5390
 URL: https://issues.apache.org/jira/browse/FLINK-5390
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
FileInputStream input = new FileInputStream(fp);
ObjectInputStream obInput = new ObjectInputStream(input);
jg = (JobGraph) obInput.readObject();
input.close();
{code}
If readObject() throws exception, input would be left unclosed.

Similar issue is in AbstractYarnClusterDescriptor#startAppMaster() around line 
726.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5379) Flink CliFrontend does not return when not logged in with kerberos

2016-12-23 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  reassigned FLINK-5379:
---

Assignee: Eron Wright 

> Flink CliFrontend does not return when not logged in with kerberos
> --
>
> Key: FLINK-5379
> URL: https://issues.apache.org/jira/browse/FLINK-5379
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>
> In pre 1.2 versions, Flink immediately fails when trying to deploy it on YARN 
> and the current user is not kerberos authenticated:
> {code}
> Error while deploying YARN cluster: Couldn't deploy Yarn cluster
> java.lang.RuntimeException: Couldn't deploy Yarn cluster
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:384)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:591)
>   at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:465)
> Caused by: 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
> In secure mode. Please provide Kerberos credentials in order to authenticate. 
> You may use kinit to authenticate and request a TGT from the Kerberos server.
>   at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:371)
>   ... 2 more
> {code}
> In 1.2, the following happens (the CLI frontend does not return. It seems to 
> be stuck in a loop)
> {code}
> 2016-12-21 13:51:29,925 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at 
> my-cluster-2wv1.c.sorter-757.internal/10.240.0.24:8032
> 2016-12-21 13:51:30,153 WARN  org.apache.hadoop.security.UserGroupInformation 
>   - PriviledgedActionException as:longrunning (auth:KERBEROS) 
> cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-12-21 13:51:30,154 WARN  org.apache.hadoop.ipc.Client
>   - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-12-21 13:51:30,154 WARN  org.apache.hadoop.security.UserGroupInformation 
>   - PriviledgedActionException as:longrunning (auth:KERBEROS) 
> cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate 
> failed [Caused by GSSException: No valid credentials provided (Mechanism 
> level: Failed to find any Kerberos tgt)]
> 2016-12-21 13:52:00,171 WARN  org.apache.hadoop.security.UserGroupInformation 
>   - PriviledgedActionException as:longrunning (auth:KERBEROS) 
> cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-12-21 13:52:00,172 WARN  org.apache.hadoop.ipc.Client
>   - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-12-21 13:52:00,172 WARN  org.apache.hadoop.security.UserGroupInformation 
>   - PriviledgedActionException as:longrunning (auth:KERBEROS) 
> cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate 
> failed [Caused by GSSException: No valid credentials provided (Mechanism 
> level: Failed to find any Kerberos tgt)]
> 2016-12-21 13:52:30,188 WARN  org.apache.hadoop.security.UserGroupInformation 
>   - PriviledgedActionException as:longrunning (auth:KERBEROS) 
> cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-12-21 13:52:30,189 WARN  org.apache.hadoop.ipc.Client
>   - Exception encountered while connecting to the server : 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 2016-12-21 13:52:30,189 WARN  org.apache.hadoop.security.UserGroupInformation 
>   - PriviledgedActionException as:longrunning (auth:KERBEROS) 
> cause:java.io.IOException: javax.security.sasl.SaslException: GSS initiate 
> failed [Caused by GSSException: No valid credentials provided (Mechanism 
> level: Failed to find any Kerberos tgt)]
> 2016-12-21 13:53:00,

[jira] [Updated] (FLINK-4088) Add interface to save and load TableSources

2016-12-23 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4088:
-
Assignee: Minudika Malshan

> Add interface to save and load TableSources
> ---
>
> Key: FLINK-4088
> URL: https://issues.apache.org/jira/browse/FLINK-4088
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Minudika Malshan
>
> Add an interface to save and load table sources similar to Java's 
> {{Serializable}} interface. 
> TableSources should implement the interface to become saveable and loadable.
> This could be used as follows:
> {code}
> val cts = new CsvTableSource(
>   "/path/to/csvfile",
>   Array("name", "age", "address"),
>   Array(BasicTypeInfo.STRING_TYPEINFO, ...),
>   ...
> )
> cts.saveToFile("/path/to/tablesource/file")
> // -
> val tEnv: TableEnvironment = ???
> tEnv.loadTableSource("persons", "/path/to/tablesource/file")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4088) Add interface to save and load TableSources

2016-12-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773864#comment-15773864
 ] 

Fabian Hueske commented on FLINK-4088:
--

Sure, I assigned the issue to you and gave you contributor permissions (you can 
now assign issues to yourself).

> Add interface to save and load TableSources
> ---
>
> Key: FLINK-4088
> URL: https://issues.apache.org/jira/browse/FLINK-4088
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Minudika Malshan
>
> Add an interface to save and load table sources similar to Java's 
> {{Serializable}} interface. 
> TableSources should implement the interface to become saveable and loadable.
> This could be used as follows:
> {code}
> val cts = new CsvTableSource(
>   "/path/to/csvfile",
>   Array("name", "age", "address"),
>   Array(BasicTypeInfo.STRING_TYPEINFO, ...),
>   ...
> )
> cts.saveToFile("/path/to/tablesource/file")
> // -
> val tEnv: TableEnvironment = ???
> tEnv.loadTableSource("persons", "/path/to/tablesource/file")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773852#comment-15773852
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3040
  
Thanks for the PR @NickolayVasilishin! 
I briefly skimmed over the changes and the approach looks OK.
It would be good to have a unit test for `FieldForwardingUtils`.

I'm currently on vacation and will review the PR in more detail early next 
year.
Thanks, Fabian


> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3040: [FLINK-3850] Add forward field annotations to DataSet

2016-12-23 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3040
  
Thanks for the PR @NickolayVasilishin! 
I briefly skimmed over the changes and the approach looks OK.
It would be good to have a unit test for `FieldForwardingUtils`.

I'm currently on vacation and will review the PR in more detail early next 
year.
Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5389) Fails #testAnswerFailureWhenSavepointReadFails

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5389:
-
Description: {{testAnswerFailureWhenSavepointReadFails}} fails in 
{{JobSubmitTest}} when  {{timeout}} is set to 5000ms, but when 6000ms it pass  
(was: {{testAnswerFailureWhenSavepointReadFails}} fails in {{JobSubmitTest}} 
when  {{timeout]] is set to 5000ms, but when 6000ms it pass)

> Fails #testAnswerFailureWhenSavepointReadFails
> --
>
> Key: FLINK-5389
> URL: https://issues.apache.org/jira/browse/FLINK-5389
> Project: Flink
>  Issue Type: Bug
> Environment: macOS sierra
>Reporter: Anton Solovev
>  Labels: test
>
> {{testAnswerFailureWhenSavepointReadFails}} fails in {{JobSubmitTest}} when  
> {{timeout}} is set to 5000ms, but when 6000ms it pass



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5389) Fails #testAnswerFailureWhenSavepointReadFails

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5389:
-
Environment: macOS sierra
 Labels: test  (was: )

> Fails #testAnswerFailureWhenSavepointReadFails
> --
>
> Key: FLINK-5389
> URL: https://issues.apache.org/jira/browse/FLINK-5389
> Project: Flink
>  Issue Type: Bug
> Environment: macOS sierra
>Reporter: Anton Solovev
>  Labels: test
>
> {{testAnswerFailureWhenSavepointReadFails}} fails in {{JobSubmitTest}} when  
> {{timeout]] is set to 5000ms, but when 6000ms it pass



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5389) Fails #testAnswerFailureWhenSavepointReadFails

2016-12-23 Thread Anton Solovev (JIRA)
Anton Solovev created FLINK-5389:


 Summary: Fails #testAnswerFailureWhenSavepointReadFails
 Key: FLINK-5389
 URL: https://issues.apache.org/jira/browse/FLINK-5389
 Project: Flink
  Issue Type: Bug
Reporter: Anton Solovev


{{testAnswerFailureWhenSavepointReadFails}} fails in {{JobSubmitTest}} when  
{{timeout]] is set to 5000ms, but when 6000ms it pass



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3027: [FLINK-5358] add RowTypeInfo exctraction in TypeExtractor

2016-12-23 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3027
  
Thanks @tonycox. The changes look good. I'd would add one more test case to 
cover the case of `Row` objects with `null` fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5358) Support RowTypeInfo extraction in TypeExtractor by Row instance

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773802#comment-15773802
 ] 

ASF GitHub Bot commented on FLINK-5358:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3027
  
Thanks @tonycox. The changes look good. I'd would add one more test case to 
cover the case of `Row` objects with `null` fields.


> Support RowTypeInfo extraction in TypeExtractor by Row instance
> ---
>
> Key: FLINK-5358
> URL: https://issues.apache.org/jira/browse/FLINK-5358
> Project: Flink
>  Issue Type: Improvement
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>
> {code}
> Row[] data = new Row[]{};
> TypeInformation typeInfo = TypeExtractor.getForObject(data[0]);
> {code}
> method {{getForObject}} wraps it into
> {code}
> GenericTypeInfo
> {code}
> the method should return {{RowTypeInfo}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3027: [FLINK-5358] add RowTypeInfo exctraction in TypeEx...

2016-12-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3027#discussion_r93805887
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 ---
@@ -345,8 +346,25 @@ public CustomType cross(CustomType first, Integer 
second) throws Exception {
 

Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class)
 instanceof PojoTypeInfo);
}
-   
 
+   @Test
+   public void testRow() {
--- End diff --

Please add a check for a `Row` with a `null` field.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5358) Support RowTypeInfo extraction in TypeExtractor by Row instance

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5358?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773800#comment-15773800
 ] 

ASF GitHub Bot commented on FLINK-5358:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3027#discussion_r93805887
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 ---
@@ -345,8 +346,25 @@ public CustomType cross(CustomType first, Integer 
second) throws Exception {
 

Assert.assertFalse(TypeExtractor.getForClass(PojoWithNonPublicDefaultCtor.class)
 instanceof PojoTypeInfo);
}
-   
 
+   @Test
+   public void testRow() {
--- End diff --

Please add a check for a `Row` with a `null` field.


> Support RowTypeInfo extraction in TypeExtractor by Row instance
> ---
>
> Key: FLINK-5358
> URL: https://issues.apache.org/jira/browse/FLINK-5358
> Project: Flink
>  Issue Type: Improvement
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>
> {code}
> Row[] data = new Row[]{};
> TypeInformation typeInfo = TypeExtractor.getForObject(data[0]);
> {code}
> method {{getForObject}} wraps it into
> {code}
> GenericTypeInfo
> {code}
> the method should return {{RowTypeInfo}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2016-12-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773795#comment-15773795
 ] 

Fabian Hueske commented on FLINK-3849:
--

Yes, this would be the way to go, IMO. The translation from {{Expression}} to 
{{RexNode}} is already implemented in {{Expression.toRexNode()}}.
The translation of {{FilterableTableSource}} should follow the approach taken 
in FLINK-3848. In contrast to FLINK-3848, we do not need to take care of 
changing schema though.

> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4088) Add interface to save and load TableSources

2016-12-23 Thread Minudika Malshan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773702#comment-15773702
 ] 

Minudika Malshan commented on FLINK-4088:
-

Hi,
I would like to work on this. Could you please assign this task to me? :) 

> Add interface to save and load TableSources
> ---
>
> Key: FLINK-4088
> URL: https://issues.apache.org/jira/browse/FLINK-4088
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> Add an interface to save and load table sources similar to Java's 
> {{Serializable}} interface. 
> TableSources should implement the interface to become saveable and loadable.
> This could be used as follows:
> {code}
> val cts = new CsvTableSource(
>   "/path/to/csvfile",
>   Array("name", "age", "address"),
>   Array(BasicTypeInfo.STRING_TYPEINFO, ...),
>   ...
> )
> cts.saveToFile("/path/to/tablesource/file")
> // -
> val tEnv: TableEnvironment = ???
> tEnv.loadTableSource("persons", "/path/to/tablesource/file")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3257) Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773700#comment-15773700
 ] 

ASF GitHub Bot commented on FLINK-3257:
---

Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
@StephanEwen could you check my question above? 


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> ---
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
>  Issue Type: Improvement
>Reporter: Paris Carbone
>Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #1668: [FLINK-3257] Add Exactly-Once Processing Guarantees for I...

2016-12-23 Thread senorcarbone
Github user senorcarbone commented on the issue:

https://github.com/apache/flink/pull/1668
  
@StephanEwen could you check my question above? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4410) Report more information about operator checkpoints

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773677#comment-15773677
 ] 

ASF GitHub Bot commented on FLINK-4410:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3042
  
Wow, this looks awesome, great work.

Small note: In screenshot 2, it says "End to Duration", which should 
probably be "End to End Duration".

Is there also a column that shows the synchronous and asynchronous parts of 
the checkpointing time?


> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3042: [FLINK-4410] Expose more fine grained checkpoint statisti...

2016-12-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3042
  
I think what would put a cherry on top is if we can break the "End To End 
Duration" down into
  - Delay till triggering (how long until all barriers were there)
  - Synchronous checkpoint time
  - Asynchronous checkpoint time

That would help big time, as many users currently get confused when 
checkpoints have long async times, assuming that the computation halts for that 
time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4410) Report more information about operator checkpoints

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773681#comment-15773681
 ] 

ASF GitHub Bot commented on FLINK-4410:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3042
  
I think what would put a cherry on top is if we can break the "End To End 
Duration" down into
  - Delay till triggering (how long until all barriers were there)
  - Synchronous checkpoint time
  - Asynchronous checkpoint time

That would help big time, as many users currently get confused when 
checkpoints have long async times, assuming that the computation halts for that 
time.


> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 1.2.0
>
>
> Checkpoint statistics contain the duration of a checkpoint, measured as from 
> the CheckpointCoordinator's start to the point when the acknowledge message 
> came.
> We should additionally expose
>   - duration of the synchronous part of a checkpoint
>   - duration of the asynchronous part of a checkpoint
>   - number of bytes buffered during the stream alignment phase
>   - duration of the stream alignment phase
> Note: In the case of using *at-least once* semantics, the latter two will 
> always be zero.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3042: [FLINK-4410] Expose more fine grained checkpoint statisti...

2016-12-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3042
  
Wow, this looks awesome, great work.

Small note: In screenshot 2, it says "End to Duration", which should 
probably be "End to End Duration".

Is there also a column that shows the synchronous and asynchronous parts of 
the checkpointing time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5388) Remove private access of edges and vertices of Gelly Graph class

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reassigned FLINK-5388:


Assignee: Anton Solovev

> Remove private access of edges and vertices of Gelly Graph class
> 
>
> Key: FLINK-5388
> URL: https://issues.apache.org/jira/browse/FLINK-5388
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.3
> Environment: Java
>Reporter: wouter ligtenberg
>Assignee: Anton Solovev
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If you want to make a special kind of Graph with special edge types or some 
> different methods on top of Gelly you want to be able to extend the Graph 
> class. Currently that's not possible because the constructor is private. I 
> don't know what effect this has on other methods or the scale of the project, 
> but it was just something that i ran into in my project



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4410) Report more information about operator checkpoints

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773649#comment-15773649
 ] 

ASF GitHub Bot commented on FLINK-4410:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3042

[FLINK-4410] Expose more fine grained checkpoint statistics

This PR exposes more fine grained checkpoint statistics. The previous 
version of the tracking code had a couple of short comings:
- Only completed checkpoints were tracked in the history. You did not see 
in progress or failed checkpoints.
- Only the latest completed checkpoint had more fine grained stats per 
operator and sub tasks. This meant that a possibly interesting checkpoint 
statistics could be live updated as you was looking at it.
- Many newly tracked statistics like checkpoint duration at the operator or 
alignment duration were not exposed.

This PR addresses these issues. For the extended tracking of the life cycle 
I decided to add tracking callbacks of all relevant entities like 
`PendingCheckpointStats`, `CompletedCheckpointStats`, `SubtaskStateStats`, 
`TaskStateStats`, etc. The life cycle of these objects follows that of their 
corresponding entities.

Furtheremore, this add new REST API handlers that work with the new tracker 
and also new layout for displaying them.

---

Some screenshots:

**Clicking on the Checkpoints Tab**: Sub tabs for overview, history, 
summary stats, and the config.


![00-start](https://cloud.githubusercontent.com/assets/1756620/21461971/3fdfb9be-c957-11e6-9f61-62610aa95da4.png)

**Clicking on the History Tab**: Lists recent checkpoints, including in 
progress ones.


![01-history](https://cloud.githubusercontent.com/assets/1756620/21461994/657fd0a0-c957-11e6-8d08-0f084e018aca.png)

**Clicking on details for a checkpoint**:


![02-details](https://cloud.githubusercontent.com/assets/1756620/21462027/ce4577a2-c957-11e6-9851-9d225c3762f4.png)

**After triggering a savepoint**:


![03-savepoint](https://cloud.githubusercontent.com/assets/1756620/21462031/d6857318-c957-11e6-810b-e6d639b5caaf.png)

**Details for the triggered savepoint**:


![04-savepoint_details](https://cloud.githubusercontent.com/assets/1756620/21462038/e80c1916-c957-11e6-984c-2447ec877c2d.png)

**Failed checkpoint while cancelling job**:


![05-failed_checkpoint](https://cloud.githubusercontent.com/assets/1756620/21462049/f9ac90f6-c957-11e6-8e0d-48dba2581378.png)


![06-failed_checkpoint_details](https://cloud.githubusercontent.com/assets/1756620/21462052/fdd2e068-c957-11e6-9cb6-e4ece5c5dd36.png)


![07-failed_checkpoint_overview](https://cloud.githubusercontent.com/assets/1756620/21462062/05fd444a-c958-11e6-8fc5-580f4e9e4e18.png)

**Clicking on the config tab**:


![09-config](https://cloud.githubusercontent.com/assets/1756620/21462067/0d3f6210-c958-11e6-9e1a-0767a8f557a5.png)

**After restoring from the savepoint**:


![08-restore_from_savepoint](https://cloud.githubusercontent.com/assets/1756620/21462071/1559a97e-c958-11e6-8ce5-b4287408d918.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4410-checkpoint_stats

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3042


commit 700ec439ed0e9fb00c52e6e373a5bcccfecce963
Author: Ufuk Celebi 
Date:   2016-12-23T19:31:29Z

[FLINK-4410] [runtime, runtime-web] Remove old checkpoint stats tracker code

commit c3f50c956f281a316a17b390851443c5be3adb6c
Author: Ufuk Celebi 
Date:   2016-12-23T19:37:08Z

[FLINK-4410] [runtime] Rework checkpoint stats tracking

commit 1db53a69829be8472fb74b6b83f0d3638121762f
Author: Ufuk Celebi 
Date:   2016-12-23T19:44:12Z

[FLINK-4410] [runtime-web] Add detailed checkpoint stats handlers

commit d6f6e7d48e05da47e02e8710fca699104bcc5988
Author: Ufuk Celebi 
Date:   2016-12-23T19:44:59Z

[FLINK-4410] [runtime-web] Add new layout for checkpoint stats

commit ab6c597f51c4aeea81dde0f82a3e1e7e72571ad9
Author: Ufuk Celebi 
Date:   2016-12-23T19:47:02Z

[FLINK-4410] [runtime-web] Rebuild JS/HTML files




> Report more information about operator checkpoints
> --
>
> Key: FLINK-4410
> URL: https://issues.apache.org/jira/browse/FLINK-4410
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Webfrontend
>Affects Versions: 1.1.2
>Reporter: Ufuk Celebi
>Assignee: Ufuk C

[GitHub] flink pull request #3042: [FLINK-4410] Expose more fine grained checkpoint s...

2016-12-23 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/3042

[FLINK-4410] Expose more fine grained checkpoint statistics

This PR exposes more fine grained checkpoint statistics. The previous 
version of the tracking code had a couple of short comings:
- Only completed checkpoints were tracked in the history. You did not see 
in progress or failed checkpoints.
- Only the latest completed checkpoint had more fine grained stats per 
operator and sub tasks. This meant that a possibly interesting checkpoint 
statistics could be live updated as you was looking at it.
- Many newly tracked statistics like checkpoint duration at the operator or 
alignment duration were not exposed.

This PR addresses these issues. For the extended tracking of the life cycle 
I decided to add tracking callbacks of all relevant entities like 
`PendingCheckpointStats`, `CompletedCheckpointStats`, `SubtaskStateStats`, 
`TaskStateStats`, etc. The life cycle of these objects follows that of their 
corresponding entities.

Furtheremore, this add new REST API handlers that work with the new tracker 
and also new layout for displaying them.

---

Some screenshots:

**Clicking on the Checkpoints Tab**: Sub tabs for overview, history, 
summary stats, and the config.


![00-start](https://cloud.githubusercontent.com/assets/1756620/21461971/3fdfb9be-c957-11e6-9f61-62610aa95da4.png)

**Clicking on the History Tab**: Lists recent checkpoints, including in 
progress ones.


![01-history](https://cloud.githubusercontent.com/assets/1756620/21461994/657fd0a0-c957-11e6-8d08-0f084e018aca.png)

**Clicking on details for a checkpoint**:


![02-details](https://cloud.githubusercontent.com/assets/1756620/21462027/ce4577a2-c957-11e6-9851-9d225c3762f4.png)

**After triggering a savepoint**:


![03-savepoint](https://cloud.githubusercontent.com/assets/1756620/21462031/d6857318-c957-11e6-810b-e6d639b5caaf.png)

**Details for the triggered savepoint**:


![04-savepoint_details](https://cloud.githubusercontent.com/assets/1756620/21462038/e80c1916-c957-11e6-984c-2447ec877c2d.png)

**Failed checkpoint while cancelling job**:


![05-failed_checkpoint](https://cloud.githubusercontent.com/assets/1756620/21462049/f9ac90f6-c957-11e6-8e0d-48dba2581378.png)


![06-failed_checkpoint_details](https://cloud.githubusercontent.com/assets/1756620/21462052/fdd2e068-c957-11e6-9cb6-e4ece5c5dd36.png)


![07-failed_checkpoint_overview](https://cloud.githubusercontent.com/assets/1756620/21462062/05fd444a-c958-11e6-8fc5-580f4e9e4e18.png)

**Clicking on the config tab**:


![09-config](https://cloud.githubusercontent.com/assets/1756620/21462067/0d3f6210-c958-11e6-9e1a-0767a8f557a5.png)

**After restoring from the savepoint**:


![08-restore_from_savepoint](https://cloud.githubusercontent.com/assets/1756620/21462071/1559a97e-c958-11e6-8ce5-b4287408d918.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 4410-checkpoint_stats

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3042


commit 700ec439ed0e9fb00c52e6e373a5bcccfecce963
Author: Ufuk Celebi 
Date:   2016-12-23T19:31:29Z

[FLINK-4410] [runtime, runtime-web] Remove old checkpoint stats tracker code

commit c3f50c956f281a316a17b390851443c5be3adb6c
Author: Ufuk Celebi 
Date:   2016-12-23T19:37:08Z

[FLINK-4410] [runtime] Rework checkpoint stats tracking

commit 1db53a69829be8472fb74b6b83f0d3638121762f
Author: Ufuk Celebi 
Date:   2016-12-23T19:44:12Z

[FLINK-4410] [runtime-web] Add detailed checkpoint stats handlers

commit d6f6e7d48e05da47e02e8710fca699104bcc5988
Author: Ufuk Celebi 
Date:   2016-12-23T19:44:59Z

[FLINK-4410] [runtime-web] Add new layout for checkpoint stats

commit ab6c597f51c4aeea81dde0f82a3e1e7e72571ad9
Author: Ufuk Celebi 
Date:   2016-12-23T19:47:02Z

[FLINK-4410] [runtime-web] Rebuild JS/HTML files




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93797831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @return A tuple of two arrays holding the field names and 
corresponding field positions.
 */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-  (Array[String], Array[Int]) =
-  {
-validateType(inputType)
-
-val fieldNames: Array[String] = inputType match {
-  case t: TupleTypeInfo[A] => t.getFieldNames
-  case c: CaseClassTypeInfo[A] => c.getFieldNames
-  case p: PojoTypeInfo[A] => p.getFieldNames
-  case r: RowTypeInfo => r.getFieldNames
-  case tpe =>
-throw new TableException(s"Type $tpe lacks explicit field naming")
-}
-val fieldIndexes = fieldNames.indices.toArray
-
-if (fieldNames.contains("*")) {
-  throw new TableException("Field name can not be '*'.")
-}
-
-(fieldNames, fieldIndexes)
+  (Array[String], Array[Int]) = {
+TableEnvironment.getFieldInfo(inputType)
--- End diff --

It is overridden in a subclass, so I decided to leave this method here and 
only move the body out of it to make it reusable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773574#comment-15773574
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93797831
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -340,25 +331,8 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @return A tuple of two arrays holding the field names and 
corresponding field positions.
 */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
-  (Array[String], Array[Int]) =
-  {
-validateType(inputType)
-
-val fieldNames: Array[String] = inputType match {
-  case t: TupleTypeInfo[A] => t.getFieldNames
-  case c: CaseClassTypeInfo[A] => c.getFieldNames
-  case p: PojoTypeInfo[A] => p.getFieldNames
-  case r: RowTypeInfo => r.getFieldNames
-  case tpe =>
-throw new TableException(s"Type $tpe lacks explicit field naming")
-}
-val fieldIndexes = fieldNames.indices.toArray
-
-if (fieldNames.contains("*")) {
-  throw new TableException("Field name can not be '*'.")
-}
-
-(fieldNames, fieldIndexes)
+  (Array[String], Array[Int]) = {
+TableEnvironment.getFieldInfo(inputType)
--- End diff --

It is overridden in a subclass, so I decided to leave this method here and 
only move the body out of it to make it reusable.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4131) Confusing error for out dated RequestPartitionState

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-4131.
--
Resolution: Duplicate

> Confusing error for out dated RequestPartitionState
> ---
>
> Key: FLINK-4131
> URL: https://issues.apache.org/jira/browse/FLINK-4131
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> When a consumer requests a partition state for an old job or execution 
> attempt (e.g. failed job or attempt), the JobManager answers with a {{null}} 
> state, which fails the requesting task with the following cause: 
> {{IllegalStateException("Received unexpected partition state null for 
> partition request. This is a bug.")}}.
> This is confusing to the user as one might think that this is the root 
> failure cause.
> I propose to either ignore the null state at the Task or not respond on the 
> JobManager side if the job or execution attempt has been cleared (see 
> {{RequestPartitionState}} in {{JobManager.scala}}).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5180) Include blocked on bounded queue length in back pressure stats

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5180.
--
Resolution: Invalid

> Include blocked on bounded queue length in back pressure stats
> --
>
> Key: FLINK-5180
> URL: https://issues.apache.org/jira/browse/FLINK-5180
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> As a follow up to FLINK-5088, we need to adjust the back pressure stats 
> tracker to report back pressure when the task is blocked on the introduced 
> capacity limit. Currently, only blocking buffer requests are accounted for.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3041: Flink 5084 tmp

2016-12-23 Thread mtunique
Github user mtunique closed the pull request at:

https://github.com/apache/flink/pull/3041


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3041: Flink 5084 tmp

2016-12-23 Thread mtunique
GitHub user mtunique opened a pull request:

https://github.com/apache/flink/pull/3041

Flink 5084 tmp

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mtunique/flink flink-5084-tmp

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3041.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3041


commit 3fa59c4baca5f6f1b5634e4ef173f6e8808b5b96
Author: mtunique 
Date:   2016-12-09T05:23:36Z

add table java api ut (join)

commit d2e9303c077b5bb0c60dc8ec9e948a51ca1a40cd
Author: mtunique 
Date:   2016-12-09T05:25:35Z

delete table java api it case (join)

commit 7700b266d450f29ce992f98d3b3e2400d126f062
Author: mtunique 
Date:   2016-12-09T08:04:00Z

add LogicalPlanFormatUtils

commit 5b3e96eb893bfbd3a53019473f5784a16ebd
Author: mtunique 
Date:   2016-12-09T08:05:23Z

join case use logicalplan format

commit 2cbb7f98dedf3c4b9d0072940409b8fc6c629b6f
Author: mtunique 
Date:   2016-12-09T08:45:10Z

 add table java api ut case (Agg)

commit bd15e481b0831e4c82347214537349a6ac5c9ef3
Author: mtunique 
Date:   2016-12-09T11:09:15Z

delete table java api it case (Agg)

commit bbfa34e1e58efcd0481682b4166d0019cdaa515e
Author: mtunique 
Date:   2016-12-09T11:14:00Z

[FLINK-5084] Replace Java Table API integration tests by unit tests

commit f457a8685647877a6e3534b32f97b1f2cb534c51
Author: mtunique 
Date:   2016-12-10T18:20:50Z

remove validation case from AggregationsITCase

commit 5d746ff311a5731f73519a8af26f2b5e78dc74f5
Author: mtunique 
Date:   2016-12-10T18:25:40Z

add AggregationsValidationTest

commit d4da0cfd3499c961ba426ecec0b2a40cd9b2fe36
Author: mtunique 
Date:   2016-12-10T18:57:11Z

remove validation case from JoinITCase

commit 9285c81ffebd833b1f6c729bb990b68286aa3c3b
Author: mtunique 
Date:   2016-12-10T18:57:42Z

add JoinValidationTest

commit cbccded4d7d0c25b34a316c4c42d703a3c55b0bc
Author: mtunique 
Date:   2016-12-10T19:08:07Z

add SetOperatorsValidationTest

commit c3875d855033e2125a7d19c6a6eed18eed69e0b3
Author: mtunique 
Date:   2016-12-10T19:08:45Z

add SortValidationTest

commit d00fd6256e4977bbc86033143132d0738abab67d
Author: mtunique 
Date:   2016-12-11T06:09:26Z

add CalcValidationTest

commit 0293e0c598beb2d1be3228f3ff3e3de4f4df6bea
Author: mtunique 
Date:   2016-12-12T09:02:40Z

delete table java IT case

commit ec20ed6a539f783cf12f1af0450289c6011549ce
Author: mtunique 
Date:   2016-12-12T09:15:19Z

add join plan case

commit bacb137d8ae44a2df6294e967a3f6ecd74f6becc
Author: mtunique 
Date:   2016-12-12T09:30:12Z

add calc plan test

commit 0f5568ca8bd590055a336c95e885d15e98338c53
Author: mtunique 
Date:   2016-12-12T09:30:33Z

add join plan test

commit f8db10e488aa5df32a1533362f52acb054380b97
Author: mtunique 
Date:   2016-12-12T09:47:19Z

add agg plan test

commit 4b120ab44416a3c17261dedae699389f772305a9
Author: mtunique 
Date:   2016-12-12T10:02:31Z

add cast test

commit 81767b6700e83dc1aa1b0bdadd756ecafdeba089
Author: mtunique 
Date:   2016-12-12T10:05:33Z

delete unused import

commit 24753f9a4d7ed0b9b09a98fcb980b8db201be625
Author: mtunique 
Date:   2016-12-23T10:19:15Z

rm testPojoGrouping and MyPojo

commit 0f178f08bf0470e60384633283d375d196c885e5
Author: mtunique 
Date:   2016-12-23T10:19:54Z

rm validation test

commit 97a7446ae09575089205e20f64d70d95bc80938a
Author: mtunique 
Date:   2016-12-23T10:20:22Z

not extend TableProgramsTestBase

commit 16f80cf7f98a43442a57a6bf0d3919b2a0b395d8
Author: mtunique 
Date:   2016-12-23T16:04:30Z

rebase master




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as

[jira] [Commented] (FLINK-5388) Remove private access of edges and vertices of Gelly Graph class

2016-12-23 Thread wouter ligtenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773242#comment-15773242
 ] 

wouter ligtenberg commented on FLINK-5388:
--

For my purpose, yes i think it is

> Remove private access of edges and vertices of Gelly Graph class
> 
>
> Key: FLINK-5388
> URL: https://issues.apache.org/jira/browse/FLINK-5388
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.3
> Environment: Java
>Reporter: wouter ligtenberg
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If you want to make a special kind of Graph with special edge types or some 
> different methods on top of Gelly you want to be able to extend the Graph 
> class. Currently that's not possible because the constructor is private. I 
> don't know what effect this has on other methods or the scale of the project, 
> but it was just something that i ran into in my project



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5388) Remove private access of edges and vertices of Gelly Graph class

2016-12-23 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773226#comment-15773226
 ] 

Greg Hogan commented on FLINK-5388:
---

Is it sufficient to change {{Graph.()}} to {{protected}}?

> Remove private access of edges and vertices of Gelly Graph class
> 
>
> Key: FLINK-5388
> URL: https://issues.apache.org/jira/browse/FLINK-5388
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.3
> Environment: Java
>Reporter: wouter ligtenberg
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If you want to make a special kind of Graph with special edge types or some 
> different methods on top of Gelly you want to be able to extend the Graph 
> class. Currently that's not possible because the constructor is private. I 
> don't know what effect this has on other methods or the scale of the project, 
> but it was just something that i ran into in my project



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773176#comment-15773176
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93780163
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

Make sense to me. It seems that we have to keep them as traits.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93780163
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

Make sense to me. It seems that we have to keep them as traits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5388) Remove private access of edges and vertices of Gelly Graph class

2016-12-23 Thread wouter ligtenberg (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773154#comment-15773154
 ] 

wouter ligtenberg commented on FLINK-5388:
--

I have created a Temporal graph tool on top of Gelly, on top of an edge label a 
temporal edge also has a starting and ending time. I have made several 
algorithms that use temporal graphs.

I wanted to make a temporalGraph class that extends the Graph class such that i 
can easily make the gelly calls like numberOfVertices for instance. 

Also i think it should be usefull if later on other people want to add their 
own abstraction layer on top of Gelly

> Remove private access of edges and vertices of Gelly Graph class
> 
>
> Key: FLINK-5388
> URL: https://issues.apache.org/jira/browse/FLINK-5388
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.3
> Environment: Java
>Reporter: wouter ligtenberg
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If you want to make a special kind of Graph with special edge types or some 
> different methods on top of Gelly you want to be able to extend the Graph 
> class. Currently that's not possible because the constructor is private. I 
> don't know what effect this has on other methods or the scale of the project, 
> but it was just something that i ran into in my project



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5388) Remove private access of edges and vertices of Gelly Graph class

2016-12-23 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773126#comment-15773126
 ] 

Anton Solovev commented on FLINK-5388:
--

[~otherwise777] could you provide an example of your necessity?

> Remove private access of edges and vertices of Gelly Graph class
> 
>
> Key: FLINK-5388
> URL: https://issues.apache.org/jira/browse/FLINK-5388
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.3
> Environment: Java
>Reporter: wouter ligtenberg
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> If you want to make a special kind of Graph with special edge types or some 
> different methods on top of Gelly you want to be able to extend the Graph 
> class. Currently that's not possible because the constructor is private. I 
> don't know what effect this has on other methods or the scale of the project, 
> but it was just something that i ran into in my project



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4313) Inconsistent code for Key/Value in the CheckpointCoordinator

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev closed FLINK-4313.

Resolution: Won't Fix

outdated

> Inconsistent code for Key/Value in the CheckpointCoordinator
> 
>
> Key: FLINK-4313
> URL: https://issues.apache.org/jira/browse/FLINK-4313
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
> Fix For: 1.2.0
>
>
> The CheckpointCoordinator seems to have maps to track KeyValue states 
> independently from other state.
> However, currently all state is transferred via a single {{StateHandle}}. The 
> CheckpointCoordinator does not populate the key/value state map ever, nor do 
> the deploy fields actually pick up any contents from that map.
> This is currently quite confusing and probably error prone.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reassigned FLINK-4228:


Assignee: Anton Solovev

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Anton Solovev
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-4228:
-
Assignee: (was: Anton Solovev)

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773084#comment-15773084
 ] 

Cliff Resnick commented on FLINK-4228:
--

My last pull request is good to go so I guess it's up to you guys.

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4651) Re-register processing time timers at the WindowOperator upon recovery.

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-4651:
---
Fix Version/s: (was: 1.1.4)
   1.1.5

> Re-register processing time timers at the WindowOperator upon recovery.
> ---
>
> Key: FLINK-4651
> URL: https://issues.apache.org/jira/browse/FLINK-4651
> Project: Flink
>  Issue Type: Bug
>  Components: Windowing Operators
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>  Labels: windows
> Fix For: 1.2.0, 1.1.5
>
>
> Currently the {{WindowOperator}} checkpoints the processing time timers, but 
> upon recovery it does not re-registers them with the {{TimeServiceProvider}}. 
> To actually reprocess them it relies on another element that will come and 
> register a new timer for a future point in time. Although this is a realistic 
> assumption in long running jobs, we can remove this assumption by 
> re-registering the restored timers with the {{TimeServiceProvider}} in the 
> {{open()}} method of the {{WindowOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5322) Clean up yarn configuration documentation

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-5322:
---
Fix Version/s: (was: 1.1.4)
   1.1.5

> Clean up yarn configuration documentation
> -
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.2.0, 1.1.3
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0, 1.1.5
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.
> --
> The yarn section of the configuration page should be cleaned up a bit. The 
> "yarn.containers.vcores" parameter is listed twice, the example for 
> "yarn.application-master.env" is listed as a separate parameter and the 
> "yarn.taskmanager.env" description indirectly references another parameter 
> ("same as the above") which just isn't maintainable; instead it should be 
> described similarly as the application-master entry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5210) Failing performCheckpoint method causes task to fail

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-5210:
---
Fix Version/s: (was: 1.1.4)
   1.1.5

> Failing performCheckpoint method causes task to fail 
> -
>
> Key: FLINK-5210
> URL: https://issues.apache.org/jira/browse/FLINK-5210
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.5
>
>
> A failure in {{StreamTask#performCheckpoint}} causes the {{Task}} to fail. 
> This should not be the case and instead the checkpoint files should be 
> cleaned up and the current checkpoint should be declined.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5227) Add warning to include flink-table in job fat jars

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-5227:
---
Fix Version/s: (was: 1.1.4)
   1.1.5

> Add warning to include flink-table in job fat jars
> --
>
> Key: FLINK-5227
> URL: https://issues.apache.org/jira/browse/FLINK-5227
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Fabian Hueske
> Fix For: 1.2.0, 1.1.5
>
>
> {{flink-table}} depends on Apache Calcite which includes a JDBC Driver that 
> prevents classloaders from being collected. This is a known issue with 
> {{java.sqlDriverManager}} and can eventually cause OOME Permgen Taskmanager 
> failures.
> The current workaround is to not include {{flink-table}} in the fat job JAR. 
> Instead the {{flink-table}} jar files should be added to the {{lib}} folder 
> of the TaskManagers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5302) Log failure cause at Execution

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-5302:
---
Fix Version/s: (was: 1.1.4)
   (was: 1.2.0)

> Log failure cause at Execution 
> ---
>
> Key: FLINK-5302
> URL: https://issues.apache.org/jira/browse/FLINK-5302
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>
> It can be helpful to log the failure cause that made an {{Execution}} switch 
> to state {{FAILED}}. We currently only see a "root cause" logged on the 
> JobManager, which happens to be the first failure cause that makes it to 
> {{ExecutionGraph#fail()}}. This depends on relative timings of messages. For 
> debugging it can be helpful to have all causes available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5233) Upgrade Jackson version because of class loader leak

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-5233:
---
Fix Version/s: (was: 1.1.4)

> Upgrade Jackson version because of class loader leak
> 
>
> Key: FLINK-5233
> URL: https://issues.apache.org/jira/browse/FLINK-5233
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.3
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>Priority: Critical
>
> A user reported this issue on the ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-1-3-OOME-Permgen-td10379.html
> I propose to upgrade to Jackson 2.7.8, as this version contains the fix for 
> the issue, but its not a major jackson upgrade.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5375) Fix watermark documentation

2016-12-23 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi updated FLINK-5375:
---
Fix Version/s: (was: 1.1.4)
   1.1.5

> Fix watermark documentation
> ---
>
> Key: FLINK-5375
> URL: https://issues.apache.org/jira/browse/FLINK-5375
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Project Website
>Affects Versions: 1.2.0, 1.1.3, 1.3.0
>Reporter: Fabian Hueske
>Priority: Critical
> Fix For: 1.2.0, 1.3.0, 1.1.5
>
>
> The [documentation of 
> watermarks|https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#event-time-and-watermarks]
>  is not correct. It states 
> {quote}
> A Watermark(t) declares that event time has reached time t in that stream, 
> meaning that all events with a timestamps t’ < t have occurred.
> {quote}
> whereas the JavaDocs which is aligned with implementation says
> {quote}
> A Watermark tells operators that receive it that no elements with a
> timestamp older or equal to the watermark timestamp should arrive at the
> operator.
> {quote}
> The documentation needs to be updated. Moreover, we need to carefully check 
> that the watermark semantics are correctly described in other pages of the 
> documentation and blog posts published on the Flink website.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5382) Taskmanager log download button causes 404

2016-12-23 Thread Sachin Goel (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Goel reassigned FLINK-5382:
--

Assignee: Sachin Goel

> Taskmanager log download button causes 404
> --
>
> Key: FLINK-5382
> URL: https://issues.apache.org/jira/browse/FLINK-5382
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Sachin Goel
>
> The "download logs" button when viewing the TaskManager logs in the web UI 
> leads to a 404 page.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-5380) Number of outgoing records not reported in web interface

2016-12-23 Thread Sachin Goel (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sachin Goel reassigned FLINK-5380:
--

Assignee: Sachin Goel

> Number of outgoing records not reported in web interface
> 
>
> Key: FLINK-5380
> URL: https://issues.apache.org/jira/browse/FLINK-5380
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Attachments: outRecordsNotreported.png
>
>
> The web frontend does not report any outgoing records in the web frontend.
> The amount of data in MB is reported correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773024#comment-15773024
 ] 

Anton Solovev commented on FLINK-4228:
--

so do you want to continue on this?

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772984#comment-15772984
 ] 

ASF GitHub Bot commented on FLINK-3850:
---

GitHub user NickolayVasilishin opened a pull request:

https://github.com/apache/flink/pull/3040

[FLINK-3850] Add forward field annotations to DataSet 

Add forward field annotations to DataSet operators generated by the Table 
API

  - Added field forwarding at most of `DataSetRel` implementations.
  - String with forwarded fields allowed to be empty at 
`SemanticPropUtil.java`
  - Wrapper for indices based on types moved to object class 
`FieldForwardingUtils`
  - In most cases forwarding done only for conversion

   `BatchScan`: forwarding at conversion
   `DataSetAggregate`: forwarding at conversion
   `DataSetCalc`: forwarding based on unmodified at RexCalls operands
   `DataSetCorrelate`:  forwarding based on unmodified at RexCalls operands
   `DataSetIntersect`:  forwarding at conversion
   `DataSetJoin`: forwarding based on fields which are not keys
   `DataSetMinus`: forwarding at conversion
   `DataSetSingleRowJoin`: forwarded all fields from multi row dataset, 
single row used via broadcast
   `DataSetSort`: all fields forwarded + conversion

I hope, I've understood the meaning of forward fields right: fields, that 
are not used for computations. So I assumed, that these fields are not used in 
`RexCalls` or as `join keys`. Also I forwarded fields in type conversions.
The most complex thing was to determine correct input and output field 
names.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NickolayVasilishin/flink FLINK-3850

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3040


commit 25cc1f022eb399bade37ef7b0fd0b87a9e509d67
Author: nikolay_vasilishin 
Date:   2016-12-23T10:50:46Z

[FLINK-3850] Add forward field annotations to DataSet operators generated 
by the Table API

  - Added field forwarding at most of DataSetRel implementations.
  - String with forwarded fields allowed to be empty at 
SemanticPropUtil.java
  - Wrapper for indices based on types moved to object class 
FieldForwardingUtils
  - In most cases forwarding done only for conversion

   BatchScan: forwarding at conversion
   DataSetAggregate: forwarding at conversion
   DataSetCalc: forwarding based on unmodified at RexCalls operands
   DataSetCorrelate:  forwarding based on unmodified at RexCalls operands
   DataSetIntersect:  forwarding at conversion
   DataSetJoin: forwarding based on fields which are not keys
   DataSetMinus: forwarding at conversion
   DataSetSingleRowJoin: forwarded all fields from multi row dataset, 
single row used via broadcast
   DataSetSort: all fields forwarded + conversion

Conflicts:

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala




> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Nikolay Vasilishin
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed 

[GitHub] flink pull request #3040: [FLINK-3850] Add forward field annotations to Data...

2016-12-23 Thread NickolayVasilishin
GitHub user NickolayVasilishin opened a pull request:

https://github.com/apache/flink/pull/3040

[FLINK-3850] Add forward field annotations to DataSet 

Add forward field annotations to DataSet operators generated by the Table 
API

  - Added field forwarding at most of `DataSetRel` implementations.
  - String with forwarded fields allowed to be empty at 
`SemanticPropUtil.java`
  - Wrapper for indices based on types moved to object class 
`FieldForwardingUtils`
  - In most cases forwarding done only for conversion

   `BatchScan`: forwarding at conversion
   `DataSetAggregate`: forwarding at conversion
   `DataSetCalc`: forwarding based on unmodified at RexCalls operands
   `DataSetCorrelate`:  forwarding based on unmodified at RexCalls operands
   `DataSetIntersect`:  forwarding at conversion
   `DataSetJoin`: forwarding based on fields which are not keys
   `DataSetMinus`: forwarding at conversion
   `DataSetSingleRowJoin`: forwarded all fields from multi row dataset, 
single row used via broadcast
   `DataSetSort`: all fields forwarded + conversion

I hope, I've understood the meaning of forward fields right: fields, that 
are not used for computations. So I assumed, that these fields are not used in 
`RexCalls` or as `join keys`. Also I forwarded fields in type conversions.
The most complex thing was to determine correct input and output field 
names.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NickolayVasilishin/flink FLINK-3850

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3040


commit 25cc1f022eb399bade37ef7b0fd0b87a9e509d67
Author: nikolay_vasilishin 
Date:   2016-12-23T10:50:46Z

[FLINK-3850] Add forward field annotations to DataSet operators generated 
by the Table API

  - Added field forwarding at most of DataSetRel implementations.
  - String with forwarded fields allowed to be empty at 
SemanticPropUtil.java
  - Wrapper for indices based on types moved to object class 
FieldForwardingUtils
  - In most cases forwarding done only for conversion

   BatchScan: forwarding at conversion
   DataSetAggregate: forwarding at conversion
   DataSetCalc: forwarding based on unmodified at RexCalls operands
   DataSetCorrelate:  forwarding based on unmodified at RexCalls operands
   DataSetIntersect:  forwarding at conversion
   DataSetJoin: forwarding based on fields which are not keys
   DataSetMinus: forwarding at conversion
   DataSetSingleRowJoin: forwarded all fields from multi row dataset, 
single row used via broadcast
   DataSetSort: all fields forwarded + conversion

Conflicts:

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCorrelate.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetIntersect.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetMinus.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSingleRowJoin.scala

flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetSort.scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5282) CheckpointStateOutputStream should be closed in finally block in SavepointV0Serializer#convertKeyedBackendState()

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev reassigned FLINK-5282:


Assignee: Anton Solovev

> CheckpointStateOutputStream should be closed in finally block in 
> SavepointV0Serializer#convertKeyedBackendState()
> -
>
> Key: FLINK-5282
> URL: https://issues.apache.org/jira/browse/FLINK-5282
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Anton Solovev
>Priority: Minor
>
> {code}
>   CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
>   
> checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
>   final long offset = keyedStateOut.getPos();
> {code}
> If getPos() throws exception, keyedStateOut would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5282) CheckpointStateOutputStream should be closed in finally block in SavepointV0Serializer#convertKeyedBackendState()

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev closed FLINK-5282.


> CheckpointStateOutputStream should be closed in finally block in 
> SavepointV0Serializer#convertKeyedBackendState()
> -
>
> Key: FLINK-5282
> URL: https://issues.apache.org/jira/browse/FLINK-5282
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Stefan Richter
>Priority: Minor
>
> {code}
>   CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
>   
> checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
>   final long offset = keyedStateOut.getPos();
> {code}
> If getPos() throws exception, keyedStateOut would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5282) CheckpointStateOutputStream should be closed in finally block in SavepointV0Serializer#convertKeyedBackendState()

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5282:
-
Assignee: Stefan Richter  (was: Anton Solovev)

> CheckpointStateOutputStream should be closed in finally block in 
> SavepointV0Serializer#convertKeyedBackendState()
> -
>
> Key: FLINK-5282
> URL: https://issues.apache.org/jira/browse/FLINK-5282
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Stefan Richter
>Priority: Minor
>
> {code}
>   CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
>   
> checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
>   final long offset = keyedStateOut.getPos();
> {code}
> If getPos() throws exception, keyedStateOut would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5384) clean up jira issues

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5384:
-
Description: 
must be closed:
FLINK-37 -> from stratosphere;
FLINK-87 -> from stratosphere;
FLINK-481 -> from stratosphere;
FLINK-605 -> from stratosphere;
FLINK-639 -> from stratosphere;
FLINK-650 -> from stratosphere;
FLINK-735 -> from stratosphere;
FLINK-456 -> from stratosphere;
FLINK-788 -> from stratosphere;
FLINK-796 -> from stratosphere;
FLINK-805 -> from stratosphere ;
FLINK-867 -> from stratosphere;
FLINK-879 -> from stratosphere;
FLINK-1166 -> closed on github;
FLINK-1946 -> closed on github;
FLINK-2119 -> closed on github;
FLINK-2157 -> closed on github;
FLINK-2220 -> closed on github;
FLINK-2319 -> closed on github;
FLINK-2363 -> closed on github;
FLINK-2399 -> closed on github;
FLINK-2428 -> closed on github;
FLINK-2472 -> closed on github;
FLINK-2480 -> closed on github;
FLINK-2609 -> closed on github;
FLINK-2823 -> closed on github;
FLINK-3155 -> closed on github;
FLINK-3331 -> closed on github;
FLINK-3964 -> closed on github;
FLINK-4653 -> closed on github;
FLINK-4717 -> closed on github;
FLINK-4829 -> closed on github;
FLINK-5016 -> closed on github;

should be discussed before:
FLINK-1055 ;
FLINK-1098 -> create other issue to add a colectEach();
FLINK-1100 ;
FLINK-1146 ;
FLINK-1335 -> maybe rename?;
FLINK-1439 ;
FLINK-1447 -> firefox problem?;
FLINK-1521 -> closed on github;
FLINK-1538 -> gsoc2015, is it solved?;
FLINK-1541 -> gsoc2015, is it solved?;
FLINK-1723 -> almost done? ;
FLINK-1814 ;
FLINK-1858 -> is QA bot deleted?;
FLINK-1926 -> all subtasks done;

FLINK-2023 -> does not block Scala Graph API;
FLINK-2032 -> all subtasks done;
FLINK-2108 -> almost done? ;
FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ;
FLINK-3109 -> its PR is stuck;
FLINK-3154 -> must be addressed as part of a bigger initiative;
FLINK-3297 -> solved as third party lib;
FLINK-4313 -> outdated;
FLINK-4760 -> fixed?

resolved^
FLINK-5282

  was:
must be closed:
FLINK-37 -> from stratosphere;
FLINK-87 -> from stratosphere;
FLINK-481 -> from stratosphere;
FLINK-605 -> from stratosphere;
FLINK-639 -> from stratosphere;
FLINK-650 -> from stratosphere;
FLINK-735 -> from stratosphere;
FLINK-456 -> from stratosphere;
FLINK-788 -> from stratosphere;
FLINK-796 -> from stratosphere;
FLINK-805 -> from stratosphere ;
FLINK-867 -> from stratosphere;
FLINK-879 -> from stratosphere;
FLINK-1166 -> closed on github;
FLINK-1946 -> closed on github;
FLINK-2119 -> closed on github;
FLINK-2157 -> closed on github;
FLINK-2220 -> closed on github;
FLINK-2319 -> closed on github;
FLINK-2363 -> closed on github;
FLINK-2399 -> closed on github;
FLINK-2428 -> closed on github;
FLINK-2472 -> closed on github;
FLINK-2480 -> closed on github;
FLINK-2609 -> closed on github;
FLINK-2823 -> closed on github;
FLINK-3155 -> closed on github;
FLINK-3331 -> closed on github;
FLINK-3964 -> closed on github;
FLINK-4653 -> closed on github;
FLINK-4717 -> closed on github;
FLINK-4829 -> closed on github;
FLINK-5016 -> closed on github;

should be discussed before:
FLINK-1055 ;
FLINK-1098 -> create other issue to add a colectEach();
FLINK-1100 ;
FLINK-1146 ;
FLINK-1335 -> maybe rename?;
FLINK-1439 ;
FLINK-1447 -> firefox problem?;
FLINK-1521 -> closed on github;
FLINK-1538 -> gsoc2015, is it solved?;
FLINK-1541 -> gsoc2015, is it solved?;
FLINK-1723 -> almost done? ;
FLINK-1814 ;
FLINK-1858 -> is QA bot deleted?;
FLINK-1926 -> all subtasks done;

FLINK-2023 -> does not block Scala Graph API;
FLINK-2032 -> all subtasks done;
FLINK-2108 -> almost done? ;
FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ;
FLINK-3109 -> its PR is stuck;
FLINK-3154 -> must be addressed as part of a bigger initiative;
FLINK-3297 -> solved as third party lib;
FLINK-4313 -> outdated;
FLINK-4760 -> fixed?


> clean up jira issues
> 
>
> Key: FLINK-5384
> URL: https://issues.apache.org/jira/browse/FLINK-5384
> Project: Flink
>  Issue Type: Improvement
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Minor
>
> must be closed:
> FLINK-37 -> from stratosphere;
> FLINK-87 -> from stratosphere;
> FLINK-481 -> from stratosphere;
> FLINK-605 -> from stratosphere;
> FLINK-639 -> from stratosphere;
> FLINK-650 -> from stratosphere;
> FLINK-735 -> from stratosphere;
> FLINK-456 -> from stratosphere;
> FLINK-788 -> from stratosphere;
> FLINK-796 -> from stratosphere;
> FLINK-805 -> from stratosphere ;
> FLINK-867 -> from stratosphere;
> FLINK-879 -> from stratosphere;
> FLINK-1166 -> closed on github;
> FLINK-1946 -> closed on github;
> FLINK-2119 -> closed on github;
> FLINK-2157 -> closed on github;
> FLINK-2220 -> closed on github;
> FLINK-2319 -> closed on github;
> FLINK-2363 -> closed on github;
> FLINK-2399 -> closed on github;
> FLINK-2428 -> closed 

[jira] [Commented] (FLINK-4228) RocksDB semi-async snapshot to S3AFileSystem fails

2016-12-23 Thread Cliff Resnick (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772906#comment-15772906
 ] 

Cliff Resnick commented on FLINK-4228:
--

The issue now is exclusive to running on YARN with s3a:// as your configured 
FileSystem. If so, the Flink session will fail on staging itself because it 
tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
support recursive copy. 

> RocksDB semi-async snapshot to S3AFileSystem fails
> --
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5282) CheckpointStateOutputStream should be closed in finally block in SavepointV0Serializer#convertKeyedBackendState()

2016-12-23 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772899#comment-15772899
 ] 

Ted Yu commented on FLINK-5282:
---

commit 8cda6a2260bbbd8e84349f0204d2980cfdd5a48a
Author: Stefan Richter 
Date:   Wed Dec 7 21:25:29 2016 +0100

[FLINK-5282] Fix closing streams on exception in SavepointV0Serializer

> CheckpointStateOutputStream should be closed in finally block in 
> SavepointV0Serializer#convertKeyedBackendState()
> -
>
> Key: FLINK-5282
> URL: https://issues.apache.org/jira/browse/FLINK-5282
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
>   
> checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
>   final long offset = keyedStateOut.getPos();
> {code}
> If getPos() throws exception, keyedStateOut would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5282) CheckpointStateOutputStream should be closed in finally block in SavepointV0Serializer#convertKeyedBackendState()

2016-12-23 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-5282.
---
Resolution: Fixed

> CheckpointStateOutputStream should be closed in finally block in 
> SavepointV0Serializer#convertKeyedBackendState()
> -
>
> Key: FLINK-5282
> URL: https://issues.apache.org/jira/browse/FLINK-5282
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   CheckpointStreamFactory.CheckpointStateOutputStream keyedStateOut =
>   
> checkpointStreamFactory.createCheckpointStateOutputStream(checkpointID, 0L);
>   final long offset = keyedStateOut.getPos();
> {code}
> If getPos() throws exception, keyedStateOut would be left unclosed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5388) Remove private access of edges and vertices of Gelly Graph class

2016-12-23 Thread wouter ligtenberg (JIRA)
wouter ligtenberg created FLINK-5388:


 Summary: Remove private access of edges and vertices of Gelly 
Graph class
 Key: FLINK-5388
 URL: https://issues.apache.org/jira/browse/FLINK-5388
 Project: Flink
  Issue Type: Improvement
  Components: Gelly
Affects Versions: 1.1.3
 Environment: Java
Reporter: wouter ligtenberg


If you want to make a special kind of Graph with special edge types or some 
different methods on top of Gelly you want to be able to extend the Graph 
class. Currently that's not possible because the constructor is private. I 
don't know what effect this has on other methods or the scale of the project, 
but it was just something that i ran into in my project



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772879#comment-15772879
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93764349
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

`CsvTableSource` is inheriting both `StreamTableSource` and 
`BatchTableSource` so they should be traits.

I don't think that adding a method that only calling implementation from a 
trait is a big issue. In any case we do not duplicate the code and do not 
re-implement methods.

Do you have any concerns about this?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93764349
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

`CsvTableSource` is inheriting both `StreamTableSource` and 
`BatchTableSource` so they should be traits.

I don't think that adding a method that only calling implementation from a 
trait is a big issue. In any case we do not duplicate the code and do not 
re-implement methods.

Do you have any concerns about this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772868#comment-15772868
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93763884
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

What about to make `TableSource` abstract class? So that it can fit in with 
Java and Scala without involving something hack.

In this way, the `StreamTableSource` and `BatchTableSource` should be 
abstract class too.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93763884
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

What about to make `TableSource` abstract class? So that it can fit in with 
Java and Scala without involving something hack.

In this way, the `StreamTableSource` and `BatchTableSource` should be 
abstract class too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772859#comment-15772859
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93763491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

I'm not sure about that. My IDEA highlight `[[TableSource.getReturnType]]` 
error but `[[TableSource#getReturnType]]` is fine. 

IDEA 2016.3.1, Scala plugin 2016.3.5


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93763491
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

I'm not sure about that. My IDEA highlight `[[TableSource.getReturnType]]` 
error but `[[TableSource#getReturnType]]` is fine. 

IDEA 2016.3.1, Scala plugin 2016.3.5


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772837#comment-15772837
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

Thank you for review. I'll try to update the PR today.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3039: [FLINK-5280] Update TableSource to support nested data

2016-12-23 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/3039
  
Hi @wuchong 

Thank you for review. I'll try to update the PR today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable(val tableSource: TableSource[_])
   extends FlinkTable[Row](
--- End diff --

Ok, good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772834#comment-15772834
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762178
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 ---
@@ -44,14 +44,14 @@ abstract class FlinkTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
--- End diff --

Ok, good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762178
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/FlinkTable.scala
 ---
@@ -44,14 +44,14 @@ abstract class FlinkTable[T](
 
   val fieldTypes: Array[TypeInformation[_]] =
--- End diff --

Ok, good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772836#comment-15772836
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762225
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
 class TableSourceTable(val tableSource: TableSource[_])
   extends FlinkTable[Row](
--- End diff --

Ok, good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772832#comment-15772832
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -38,7 +38,9 @@ class BatchTableSourceScan(
 
   override def deriveRowType() = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, 
tableSource.getFieldTypes)
+flinkTypeFactory.buildRowDataType(
+  tableSource.getFieldsNames,
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
--- End diff --

Should be fine if we add support for `AtomicType` in 
`TableEnvironment.getFieldTypes`


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762127
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala
 ---
@@ -38,7 +38,9 @@ class BatchTableSourceScan(
 
   override def deriveRowType() = {
 val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-flinkTypeFactory.buildRowDataType(tableSource.getFieldsNames, 
tableSource.getFieldTypes)
+flinkTypeFactory.buildRowDataType(
+  tableSource.getFieldsNames,
+  TableEnvironment.getFieldTypes(tableSource.getReturnType))
--- End diff --

Should be fine if we add support for `AtomicType` in 
`TableEnvironment.getFieldTypes`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772830#comment-15772830
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762016
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
--- End diff --

Good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93762016
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/TableSourceTable.scala
 ---
@@ -25,6 +25,6 @@ import org.apache.flink.types.Row
 /** Table which defines an external table via a [[TableSource]] */
--- End diff --

Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761979
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

We do not override this method. As far as I understand we cannot inherit a 
method from a Scala trait if this trait has implementation: 
http://stackoverflow.com/a/7637888

Am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772829#comment-15772829
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761987
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
}
 
-   @Override
-   public TypeInformation[] getFieldTypes() {
-   return fieldTypes;
+   public int[] getFieldsIndices() {
+   return TableSource$class.getFieldsIndices(this);
--- End diff --

Ditto.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761987
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
}
 
-   @Override
-   public TypeInformation[] getFieldTypes() {
-   return fieldTypes;
+   public int[] getFieldsIndices() {
+   return TableSource$class.getFieldsIndices(this);
--- End diff --

Ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772828#comment-15772828
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761979
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
 ---
@@ -111,24 +112,17 @@
return kafkaSource;
}
 
-   @Override
-   public int getNumberOfFields() {
-   return fieldNames.length;
-   }
-
-   @Override
public String[] getFieldsNames() {
-   return fieldNames;
+   return TableSource$class.getFieldsNames(this);
--- End diff --

We do not override this method. As far as I understand we cannot inherit a 
method from a Scala trait if this trait has implementation: 
http://stackoverflow.com/a/7637888

Am I missing something?


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772825#comment-15772825
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
   /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
+  def getFieldsNames: Array[String] = {
+TableEnvironment.getFieldInfo(getReturnType)._1
+  }
+
+  /** Returns the indices of the table fields. */
+  def getFieldsIndices: Array[Int] = {
+getFieldsNames.indices.toArray
--- End diff --

Ok, good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761841
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
+  *
+  * In case if custom field names are required one need to implement both
+  * [[TableSource.getFieldsNames]] and [[TableSource.getFieldsIndices]].
   *
   * @tparam T The return type of the [[TableSource]].
   */
 trait TableSource[T] {
 
-  /** Returns the number of fields of the table. */
-  def getNumberOfFields: Int
-
   /** Returns the names of the table fields. */
-  def getFieldsNames: Array[String]
-
-  /** Returns the types of the table fields. */
-  def getFieldTypes: Array[TypeInformation[_]]
+  def getFieldsNames: Array[String] = {
+TableEnvironment.getFieldInfo(getReturnType)._1
+  }
+
+  /** Returns the indices of the table fields. */
+  def getFieldsIndices: Array[Int] = {
+getFieldsNames.indices.toArray
--- End diff --

Ok, good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761823
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

Aren't we using scaladoc here? I thought it's different in scaladoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772822#comment-15772822
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761791
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
--- End diff --

Good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772824#comment-15772824
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761823
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
--- End diff --

Aren't we using scaladoc here? I thought it's different in scaladoc.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761791
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSource.scala
 ---
@@ -19,21 +19,32 @@
 package org.apache.flink.table.sources
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableEnvironment
 
-/** Defines an external table by providing schema information, i.e., field 
names and types.
+/** Defines an external table by providing schema information and used to 
produce a
+  * [[org.apache.flink.api.scala.DataSet]] or 
[[org.apache.flink.streaming.api.scala.DataStream]].
+  * Schema information consists of a data type, field names, and 
corresponding indices of
+  * these names in the data type.
+  *
+  * To define a TableSource one need to implement 
[[TableSource.getReturnType]]. In this case
+  * field names and field indices are deducted from the returned type.
--- End diff --

Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772819#comment-15772819
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+  }
+
+  private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = {
+0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
--- End diff --

Good point.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761574
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+  }
+
+  private def getTypes(c: CompositeType[_]): Array[TypeInformation[_]] = {
+0.until(c.getTotalFields).map(c.getTypeAt(_)).toArray
--- End diff --

Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772816#comment-15772816
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
--- End diff --

Ok, will update.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772813#comment-15772813
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
--- End diff --

Sure.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation to extract field types from.
+* @return an holding the field types.
+*/
+  def getFieldTypes(inputType: TypeInformation[_]): 
Array[TypeInformation[_]] = {
+validateType(inputType)
+
+inputType match {
+  case t: TupleTypeInfo[_] => getTypes(t)
+  case c: CaseClassTypeInfo[_] => getTypes(c)
+  case p: PojoTypeInfo[_] => getTypes(p)
+  case r: RowTypeInfo => getTypes(r)
--- End diff --

Ok, will update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772815#comment-15772815
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
--- End diff --

Ok, this makes sense.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761548
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
+val clazz = typeInfo.getTypeClass
+if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
+  !Modifier.isPublic(clazz.getModifiers) ||
+  clazz.getCanonicalName == null) {
+  throw TableException(s"Class '$clazz' described in type information 
'$typeInfo' must be " +
+s"static and globally accessible.")
+}
+  }
+
+  /**
+* Returns field types for a given [[TypeInformation]].
+*
+* Field types are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
--- End diff --

Ok, this makes sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
+  case tpe =>
+throw new TableException(s"Type $tpe lacks explicit field naming")
+}
+val fieldIndexes = fieldNames.indices.toArray
+
+if (fieldNames.contains("*")) {
+  throw new TableException("Field name can not be '*'.")
+}
+
+(fieldNames, fieldIndexes)
+  }
+
+  def validateType(typeInfo: TypeInformation[_]): Unit = {
--- End diff --

Sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5280) Extend TableSource to support nested data

2016-12-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15772811#comment-15772811
 ] 

ASF GitHub Bot commented on FLINK-5280:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
--- End diff --

Good point, will do that.


> Extend TableSource to support nested data
> -
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of 
> flat rows. 
> However, there are several storage formats for nested data that should be 
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can 
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in 
> Calcite's schema need to be extended to support nested data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3039: [FLINK-5280] Update TableSource to support nested ...

2016-12-23 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/3039#discussion_r93761407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -535,4 +509,74 @@ object TableEnvironment {
 
 new ScalaStreamTableEnv(executionEnvironment, tableConfig)
   }
+
+  /**
+* Returns field names and field positions for a given 
[[TypeInformation]].
+*
+* Field names are automatically extracted for
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+* The method fails if inputType is not a
+* [[org.apache.flink.api.common.typeutils.CompositeType]].
+*
+* @param inputType The TypeInformation extract the field names and 
positions from.
+* @tparam A The type of the TypeInformation.
+* @return A tuple of two arrays holding the field names and 
corresponding field positions.
+*/
+  def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], 
Array[Int]) = {
+validateType(inputType)
+
+val fieldNames: Array[String] = inputType match {
+  case t: TupleTypeInfo[A] => t.getFieldNames
+  case c: CaseClassTypeInfo[A] => c.getFieldNames
+  case p: PojoTypeInfo[A] => p.getFieldNames
+  case r: RowTypeInfo => r.getFieldNames
--- End diff --

Good point, will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5384) clean up jira issues

2016-12-23 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5384:
-
Description: 
must be closed:
FLINK-37 -> from stratosphere;
FLINK-87 -> from stratosphere;
FLINK-481 -> from stratosphere;
FLINK-605 -> from stratosphere;
FLINK-639 -> from stratosphere;
FLINK-650 -> from stratosphere;
FLINK-735 -> from stratosphere;
FLINK-456 -> from stratosphere;
FLINK-788 -> from stratosphere;
FLINK-796 -> from stratosphere;
FLINK-805 -> from stratosphere ;
FLINK-867 -> from stratosphere;
FLINK-879 -> from stratosphere;
FLINK-1166 -> closed on github;
FLINK-1946 -> closed on github;
FLINK-2119 -> closed on github;
FLINK-2157 -> closed on github;
FLINK-2220 -> closed on github;
FLINK-2319 -> closed on github;
FLINK-2363 -> closed on github;
FLINK-2399 -> closed on github;
FLINK-2428 -> closed on github;
FLINK-2472 -> closed on github;
FLINK-2480 -> closed on github;
FLINK-2609 -> closed on github;
FLINK-2823 -> closed on github;
FLINK-3155 -> closed on github;
FLINK-3331 -> closed on github;
FLINK-3964 -> closed on github;
FLINK-4653 -> closed on github;
FLINK-4717 -> closed on github;
FLINK-4829 -> closed on github;
FLINK-5016 -> closed on github;

should be discussed before:
FLINK-1055 ;
FLINK-1098 -> create other issue to add a colectEach();
FLINK-1100 ;
FLINK-1146 ;
FLINK-1335 -> maybe rename?;
FLINK-1439 ;
FLINK-1447 -> firefox problem?;
FLINK-1521 -> closed on github;
FLINK-1538 -> gsoc2015, is it solved?;
FLINK-1541 -> gsoc2015, is it solved?;
FLINK-1723 -> almost done? ;
FLINK-1814 ;
FLINK-1858 -> is QA bot deleted?;
FLINK-1926 -> all subtasks done;

FLINK-2023 -> does not block Scala Graph API;
FLINK-2032 -> all subtasks done;
FLINK-2108 -> almost done? ;
FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ;
FLINK-3109 -> its PR is stuck;
FLINK-3154 -> must be addressed as part of a bigger initiative;
FLINK-3297 -> solved as third party lib;
FLINK-4313 -> outdated;
FLINK-4760 -> fixed?

  was:
must be closed:
FLINK-37 -> from stratosphere;
FLINK-87 -> from stratosphere;
FLINK-481 -> from stratosphere;
FLINK-605 -> from stratosphere;
FLINK-639 -> from stratosphere;
FLINK-650 -> from stratosphere;
FLINK-735 -> from stratosphere;
FLINK-456 -> from stratosphere;
FLINK-788 -> from stratosphere;
FLINK-796 -> from stratosphere;
FLINK-805 -> from stratosphere ;
FLINK-867 -> from stratosphere;
FLINK-879 -> from stratosphere;
FLINK-1166 -> closed on github;
FLINK-1946 -> closed on github;
FLINK-2119 -> closed on github;
FLINK-2157 -> closed on github;
FLINK-2220 -> closed on github;
FLINK-2319 -> closed on github;
FLINK-2363 -> closed on github;
FLINK-2399 -> closed on github;
FLINK-2428 -> closed on github;
FLINK-2472 -> closed on github;
FLINK-2480 -> closed on github;
FLINK-2609 -> closed on github;
FLINK-2823 -> closed on github;
FLINK-3155 -> closed on github;
FLINK-3331 -> closed on github;
FLINK-3964 -> closed on github;
FLINK-4653 -> closed on github;
FLINK-4717 -> closed on github;
FLINK-4829 -> closed on github;
FLINK-5016 -> closed on github;

should be discussed before:
FLINK-1055 ;
FLINK-1098 -> create other issue to add a colectEach();
FLINK-1100 ;
FLINK-1146 ;
FLINK-1335 -> maybe rename?;
FLINK-1439 ;
FLINK-1447 -> firefox problem?;
FLINK-1521 -> closed on github;
FLINK-1538 -> gsoc2015, is it solved?;
FLINK-1541 -> gsoc2015, is it solved?;
FLINK-1723 -> almost done? ;
FLINK-1814 ;
FLINK-1858 -> is QA bot deleted?;
FLINK-1926 -> all subtasks done;

FLINK-2023 -> does not block Scala Graph API;
FLINK-2032 -> all subtasks done;
FLINK-2108 -> almost done? ;
FLINK-2309 -> maybe it's worth to merge with FLINK-2316 ? ;
FLINK-3109 -> its PR is stuck;
FLINK-3154 -> must be addressed as part of a bigger initiative;
FLINK-3297 -> solved as third party lib;
FLINK-4760 -> fixed?


> clean up jira issues
> 
>
> Key: FLINK-5384
> URL: https://issues.apache.org/jira/browse/FLINK-5384
> Project: Flink
>  Issue Type: Improvement
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Minor
>
> must be closed:
> FLINK-37 -> from stratosphere;
> FLINK-87 -> from stratosphere;
> FLINK-481 -> from stratosphere;
> FLINK-605 -> from stratosphere;
> FLINK-639 -> from stratosphere;
> FLINK-650 -> from stratosphere;
> FLINK-735 -> from stratosphere;
> FLINK-456 -> from stratosphere;
> FLINK-788 -> from stratosphere;
> FLINK-796 -> from stratosphere;
> FLINK-805 -> from stratosphere ;
> FLINK-867 -> from stratosphere;
> FLINK-879 -> from stratosphere;
> FLINK-1166 -> closed on github;
> FLINK-1946 -> closed on github;
> FLINK-2119 -> closed on github;
> FLINK-2157 -> closed on github;
> FLINK-2220 -> closed on github;
> FLINK-2319 -> closed on github;
> FLINK-2363 -> closed on github;
> FLINK-2399 -> closed on github;
> FLINK-2428 -> closed on github;
> FLINK-2472 -> closed on github;
>

  1   2   >