[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-01-26 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341718#comment-16341718
 ] 

Flavio Pompermaier commented on FLINK-8101:
---

Don't worry Christophe, you're PR looks much accurated than mine, I spent just 
1 hour trying to implement it and I faced some problem I wa in doubt how to 
solve. Looking at your PR it seems that you were able to find the right 
solution. Moreover this shows how much interest there's in this connector ;)

The only 2 points I'm still in doubt are:
 # does it make sense to try to keep compatibility between 
TransportClient-based versions and RestClinet ones or is it better to start a 
new base ES connector?
 # Are the documents sent as plain json or something faster (i.e. compreseed 
binary)? Are we using the REST client at its best?

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-01-26 Thread Christophe Jolif (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341657#comment-16341657
 ] 

Christophe Jolif commented on FLINK-8101:
-

Hi Flavio,

It happens I was working on the same thing at the same time! Anyway I submitted 
my PR as well as it was ready. I did not have the time to look at your yet to 
see what might be different.

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341650#comment-16341650
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/5374

[FLINK-8101][flink-connectors] Elasticsearch 5.3+ (TransportClient) and 6.x 
(RestHighLevelClient) support


 

## What is the purpose of the change

*The purpose of this PR is to add Elasticsearch 6.X support on top of the 
RestHighLevelClient. Indeed TransportClient is now deprecated and will be 
removed in 8.X. Also the hosted version of Elasticsearch often forbid the use 
of TransportClient.*

## Brief change log

* First a set of changes are borrowed from #4675:
  * Add createRequestIndex method in ElasticsearchApiCallBridge
  * Add flink-connector-elasticsearch5.3 project
  * Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert 
ActionRequest to DocWriteRequest
* Then on top of these changes and of being able to create a 
RestHighLevelClient instead of TransportClient:
   * Add createClient method in ElasticsearchApiCallBridge. As 
TransportClient and RestClient have only the AutoCloseable interface in common, 
this is what the method returns.
   * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  * Add flink-connector-elasticsearch6 project leveraging Rest Client while 
all the other ones still use TransportClient.

## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to leverage 
it for testing the Rest client base implementation.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): if you use the 
elasticsearch6 project, this adds dependencies on elasticsearch 6
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es53-es6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5374


commit b6a2396b31ade29071c65efa72df9f8f1fab9af4
Author: zjureel 
Date:   2017-09-15T03:51:35Z

[FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add 
createRequestIndex method

commit 1e5b21a331dfaed50844e89986c313f5fc40bdbe
Author: zjureel 
Date:   2017-09-15T03:55:16Z

[FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and 
later versions

commit 5a6e840c316095dd4f65f559405b19dcda7a1ca0
Author: zjureel 
Date:   2017-09-15T04:42:44Z

[FLINK-7386] add test case for ES53

commit 574818f0f56f6a2b644e271a05a0796d90598aef
Author: zjureel 
Date:   2017-09-15T05:33:43Z

[FLINK-7386] add document for ES5.3

commit 14168825507ad98c49a63be8ceab23dc539ff977
Author: Christophe Jolif 
Date:   2018-01-25T21:31:57Z

[FLINK-8101] Elasticsearch 6.X REST support




> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ ...

2018-01-26 Thread cjolif
GitHub user cjolif opened a pull request:

https://github.com/apache/flink/pull/5374

[FLINK-8101][flink-connectors] Elasticsearch 5.3+ (TransportClient) and 6.x 
(RestHighLevelClient) support


 

## What is the purpose of the change

*The purpose of this PR is to add Elasticsearch 6.X support on top of the 
RestHighLevelClient. Indeed TransportClient is now deprecated and will be 
removed in 8.X. Also the hosted version of Elasticsearch often forbid the use 
of TransportClient.*

## Brief change log

* First a set of changes are borrowed from #4675:
  * Add createRequestIndex method in ElasticsearchApiCallBridge
  * Add flink-connector-elasticsearch5.3 project
  * Add BulkProcessorIndexer in connector-elasticsearch5.3 to convert 
ActionRequest to DocWriteRequest
* Then on top of these changes and of being able to create a 
RestHighLevelClient instead of TransportClient:
   * Add createClient method in ElasticsearchApiCallBridge. As 
TransportClient and RestClient have only the AutoCloseable interface in common, 
this is what the method returns.
   * Make ElasticsearchSinkBase agnostic to whether it is using a 
TransportClient or RestClient by adding a createBulkProcessorBuilder method on 
ElasticsearchApiCallBridge that the ElasticsearchSinkBase calls. Implement this 
method on all bridges. 
  * Add flink-connector-elasticsearch6 project leveraging Rest Client while 
all the other ones still use TransportClient.

## Verifying this change

This change added tests and can be verified as follows:
* Elasticsearch test base has also been reworked a little bit to leverage 
it for testing the Rest client base implementation.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): if you use the 
elasticsearch6 project, this adds dependencies on elasticsearch 6
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:  no
  - The serializers:  no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs & javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cjolif/flink es53-es6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5374


commit b6a2396b31ade29071c65efa72df9f8f1fab9af4
Author: zjureel 
Date:   2017-09-15T03:51:35Z

[FLINK-7386] Change ElasticsearchApiCallBridge to abstract class and add 
createRequestIndex method

commit 1e5b21a331dfaed50844e89986c313f5fc40bdbe
Author: zjureel 
Date:   2017-09-15T03:55:16Z

[FLINK-7386] add flink-connector-elasticsearch5.3 for elasticsearch5.3 and 
later versions

commit 5a6e840c316095dd4f65f559405b19dcda7a1ca0
Author: zjureel 
Date:   2017-09-15T04:42:44Z

[FLINK-7386] add test case for ES53

commit 574818f0f56f6a2b644e271a05a0796d90598aef
Author: zjureel 
Date:   2017-09-15T05:33:43Z

[FLINK-7386] add document for ES5.3

commit 14168825507ad98c49a63be8ceab23dc539ff977
Author: Christophe Jolif 
Date:   2018-01-25T21:31:57Z

[FLINK-8101] Elasticsearch 6.X REST support




---


[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341638#comment-16341638
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5342
  
Very interesting! two things:
1. can you make the google doc publicly viewable? I cannot access it right 
now
2. how does it handle event time window joins of two streams, where data in 
one stream always quite late than the other? For example, we are joining stream 
A and B on a 10 min event-time tumbling window from 12:00 -12:10, 12:10 - 
12:20 data in stream B always arrive 30 mins later than the data in stream 
A. How does the operators handle that? Does it cache A's data until B's data 
arrives, do the join, and remove A's data from cache?   (I haven't read the 
code in detail, just try to get a general idea of the overall design)


> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5342: [FLINK-8479] Timebounded stream join

2018-01-26 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5342
  
Very interesting! two things:
1. can you make the google doc publicly viewable? I cannot access it right 
now
2. how does it handle event time window joins of two streams, where data in 
one stream always quite late than the other? For example, we are joining stream 
A and B on a 10 min event-time tumbling window from 12:00 -12:10, 12:10 - 
12:20 data in stream B always arrive 30 mins later than the data in stream 
A. How does the operators handle that? Does it cache A's data until B's data 
arrives, do the join, and remove A's data from cache?   (I haven't read the 
code in detail, just try to get a general idea of the overall design)


---


[jira] [Commented] (FLINK-8230) NPE in OrcRowInputFormat on nested structs

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341536#comment-16341536
 ] 

ASF GitHub Bot commented on FLINK-8230:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5373

[FLINK-8230] [orc] Fix NPEs when reading nested columns.

## What is the purpose of the change

- fixes NPEs for null-valued structs, lists, and maps
- fixes NPEs for repeating structs, lists, and maps

## Brief change log

- renamed `OrcUtils` to `OrcBatchReader`
- reimplement large parts of the `OrcBatchReader`
- add tests for nested columns with nulls and repeating values
- added a class to generate ORC files for tests

## Verifying this change

- added tests to `OrcInputFormatTest`
- changes have been verified by a user on a production dataset.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink orcNPE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5373


commit 97fd0c7687eff6f8b84516f4d8ab8268cab838b1
Author: Fabian Hueske 
Date:   2017-12-12T11:09:14Z

[FLINK-8230] [orc] Fix NPEs when reading nested columns.

- fixes NPEs for null-valued structs, lists, and maps
- fixes NPEs for repeating structs, lists, and maps
- adds test for deeply nested data with nulls
- adds test for columns with repeating values




> NPE in OrcRowInputFormat on nested structs
> --
>
> Key: FLINK-8230
> URL: https://issues.apache.org/jira/browse/FLINK-8230
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.4.0
>Reporter: Sebastian Klemke
>Assignee: Fabian Hueske
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> OrcRowInputFormat ignores isNull and isRepeating on nested struct columns. If 
> a struct column contains nulls, it tries to read struct fields, leading to 
> NPE in case of string fields:
> {code}
> java.lang.NullPointerException
>   at java.lang.String.checkBounds(String.java:384)
>   at java.lang.String.(String.java:462)
>   at 
> org.apache.flink.orc.OrcUtils.readNonNullBytesColumnAsString(OrcUtils.java:392)
>   at org.apache.flink.orc.OrcUtils.readField(OrcUtils.java:215)
>   at org.apache.flink.orc.OrcUtils.readStructColumn(OrcUtils.java:1203)
>   at org.apache.flink.orc.OrcUtils.readField(OrcUtils.java:252)
>   at 
> org.apache.flink.orc.OrcUtils.readNonNullStructColumn(OrcUtils.java:677)
>   at org.apache.flink.orc.OrcUtils.readField(OrcUtils.java:250)
>   at org.apache.flink.orc.OrcUtils.fillRows(OrcUtils.java:142)
>   at 
> org.apache.flink.orc.OrcRowInputFormat.ensureBatch(OrcRowInputFormat.java:334)
>   at 
> org.apache.flink.orc.OrcRowInputFormat.reachedEnd(OrcRowInputFormat.java:314)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:165)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5373: [FLINK-8230] [orc] Fix NPEs when reading nested co...

2018-01-26 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5373

[FLINK-8230] [orc] Fix NPEs when reading nested columns.

## What is the purpose of the change

- fixes NPEs for null-valued structs, lists, and maps
- fixes NPEs for repeating structs, lists, and maps

## Brief change log

- renamed `OrcUtils` to `OrcBatchReader`
- reimplement large parts of the `OrcBatchReader`
- add tests for nested columns with nulls and repeating values
- added a class to generate ORC files for tests

## Verifying this change

- added tests to `OrcInputFormatTest`
- changes have been verified by a user on a production dataset.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink orcNPE

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5373


commit 97fd0c7687eff6f8b84516f4d8ab8268cab838b1
Author: Fabian Hueske 
Date:   2017-12-12T11:09:14Z

[FLINK-8230] [orc] Fix NPEs when reading nested columns.

- fixes NPEs for null-valued structs, lists, and maps
- fixes NPEs for repeating structs, lists, and maps
- adds test for deeply nested data with nulls
- adds test for columns with repeating values




---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341450#comment-16341450
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8516:


Posting the mailing list discussion thread of the issue here, for easier 
reference:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Kinesis-consumer-shard-skew-FLINK-8516-td20843.html

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341449#comment-16341449
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8516:


[~thw] done, you can now assign tickets to yourself!
I'm also assuming you'd like to work on this issue, so I assigned it for you.

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-8516:
--

Assignee: Thomas Weise

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-26 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341433#comment-16341433
 ] 

Bowen Li commented on FLINK-3089:
-

[~srichter] Right, we should implement it in both backends, and also give users 
a heads-up that TTL's heap implementation would increase their in-memory state 
size and they should consider it in memory capacity planning.

I actually did some research yesterday on how TTL should be implemented in 
memory. What you described is very similar to [how Redis implemented 
TTL|https://redis.io/commands/expire#how-redis-expires-keys], and, of course, 
we need to cater the strategy to Flink.

How about this? Let me summarize all the above discussions and write up a 
google doc, and then we can iterate on the design

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341353#comment-16341353
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
--- End diff --

`periodicBoundedOOOWatermarks()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.

***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341357#comment-16341357
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168781
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
--- End diff --

`preserveSourceTimestamps()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341345#comment-16341345
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164122214
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/NormalizedProperties.scala
 ---
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.io.Serializable
+import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, 
Long => JLong}
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorUtils._
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTableSchema
+import org.apache.flink.table.plan.stats.ColumnStats
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.Preconditions.checkNotNull
+
+import scala.collection.mutable
+
+/**
+  * Utility class for having a unified string-based representation of 
Table API related classes
+  * such as [[TableSchema]], [[TypeInformation]], [[WatermarkStrategy]], 
etc.
+  */
+class NormalizedProperties(
--- End diff --

Rename to `TableSourceProperties`? `NormalizedProperties` is quite generic


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341354#comment-16341354
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164166552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
--- End diff --

`periodicAscendingWatermarks()`?


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaratio

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341358#comment-16341358
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164169928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
+  * the current classpath to be found.
+  */
+trait TableSourceFactory[T] {
--- End diff --

We might want to add a method that exposes all properties of the connector 
and format that the factory supports. 


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341351#comment-16341351
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168602
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
--- End diff --

`timestampsFromExtractor()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design documen

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341355#comment-16341355
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
+watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+this
+  }
+
+  /**
+* Sets a custom watermark strategy to be used for the rowtime 
attribute.
+*/
+  def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = {
--- End diff --

`watermarksFromStrategy()`


> Create unified interfaces to configure and instatiate TableSources
> --

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341350#comment-16341350
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164150350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
--- End diff --

"previously defined"


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341356#comment-16341356
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
--- End diff --

`timestampsFromField()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341352#comment-16341352
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164151474
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as a processing-time 
attribute.
+*
+* E.g. field("myString", Types.STRING).proctime()
+*/
+  def proctime(): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.PROCTIME -> 
DescriptorUtils.TRUE)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as an event-time attribute.
+*
+* E.g. field("myString", Types.STRING).rowtime(...)
--- End diff --

`field("procTime", Types.SQL_TIMESTAMP).proctime()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341359#comment-16341359
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164149992
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
--- End diff --

Add that fields are matched by exact name by default.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341348#comment-16341348
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164126202
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -39,6 +39,9 @@ case class SqlParserException(
 
 /**
   * General Exception for all errors during table handling.
+  *
+  * This exception indicates that an internal error occurred or the 
feature is not fully
--- End diff --

"This exception indicates that an internal error occurred or that a feature 
is not supported yet. Usually, this exception does not indicate a fault of the 
user."



> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341347#comment-16341347
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164151340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as a processing-time 
attribute.
+*
+* E.g. field("myString", Types.STRING).proctime()
--- End diff --

`field("procTime", Types.SQL_TIMESTAMP).proctime()`


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341349#comment-16341349
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168331
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
--- End diff --

`preserveSourceWatermarks()`

`DataStream` is only an internal aspect that's not visible when using table 
sources.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type:

[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341346#comment-16341346
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164131675
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CSV.scala
 ---
@@ -139,26 +139,26 @@ class CSV extends EncodingDescriptor("csv") {
   }
 
   /**
-* Internal method for encoding properties conversion.
+* Internal method for format properties conversion.
 */
-  override protected def addEncodingProperties(properties: 
NormalizedProperties): Unit = {
-fieldDelim.foreach(properties.putString("field-delimiter", _))
-lineDelim.foreach(properties.putString("line-delimiter", _))
-properties.putTableSchema("fields", encodingSchema.toIndexedSeq)
-quoteCharacter.foreach(properties.putCharacter("quote-character", _))
-commentPrefix.foreach(properties.putString("comment-prefix", _))
-isIgnoreFirstLine.foreach(properties.putBoolean("ignore-first-line", 
_))
-lenient.foreach(properties.putBoolean("ignore-parse-errors", _))
+  override protected def addFormatProperties(properties: 
NormalizedProperties): Unit = {
+
fieldDelim.foreach(properties.putString(DescriptorUtils.FIELD_DELIMITER, _))
--- End diff --

I would not define the constants globally. Some constants should be global, 
but constants for specific connectors or formats, should go to the respective 
descriptor.
IMO, it would be better to have these keys in `CSV` or the class that 
validates the properties of a certain type.


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
+watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
+this
+  }
+
+  /**
+* Sets a custom watermark strategy to be used for the rowtime 
attribute.
+*/
+  def watermarkFromStrategy(strategy: WatermarkStrategy): Rowtime = {
--- End diff --

`watermarksFromStrategy()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164166552
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
--- End diff --

`periodicAscendingWatermarks()`?


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164151340
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as a processing-time 
attribute.
+*
+* E.g. field("myString", Types.STRING).proctime()
--- End diff --

`field("procTime", Types.SQL_TIMESTAMP).proctime()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164126202
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 ---
@@ -39,6 +39,9 @@ case class SqlParserException(
 
 /**
   * General Exception for all errors during table handling.
+  *
+  * This exception indicates that an internal error occurred or the 
feature is not fully
--- End diff --

"This exception indicates that an internal error occurred or that a feature 
is not supported yet. Usually, this exception does not indicate a fault of the 
user."



---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168602
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
--- End diff --

`timestampsFromExtractor()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168331
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
+watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy which indicates the watermarks 
should be preserved from the
+* underlying DataStream API.
+*/
+  def watermarkFromDataStream(): Rowtime = {
--- End diff --

`preserveSourceWatermarks()`

`DataStream` is only an internal aspect that's not visible when using table 
sources.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
+timestampExtractor = Some(new StreamRecordTimestamp)
+this
+  }
+
+  /**
+* Sets a custom timestamp extractor to be used for the rowtime 
attribute.
+*
+* @param extractor The [[TimestampExtractor]] to extract the rowtime 
attribute
+*  from the physical type.
+*/
+  def timestampFromExtractor(extractor: TimestampExtractor): Rowtime = {
+timestampExtractor = Some(extractor)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for ascending rowtime attributes.
+*
+* Emits a watermark of the maximum observed timestamp so far minus 1.
+* Rows that have a timestamp equal to the max timestamp are not late.
+*/
+  def watermarkPeriodicAscending(): Rowtime = {
+watermarkStrategy = Some(new AscendingTimestamps)
+this
+  }
+
+  /**
+* Sets a built-in watermark strategy for rowtime attributes which are 
out-of-order by a bounded
+* time interval.
+*
+* Emits watermarks which are the maximum observed timestamp minus the 
specified delay.
+*/
+  def watermarkPeriodicBounding(delay: Long): Rowtime = {
--- End diff --

`periodicBoundedOOOWatermarks()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168781
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
+timestampExtractor = Some(new ExistingField(fieldName))
+this
+  }
+
+  /**
+* Sets a built-in timestamp extractor that converts the assigned 
timestamp from
+* a DataStream API record into the rowtime attribute.
+*
+* Note: This extractor only works in streaming environments.
+*/
+  def timestampFromDataStream(): Rowtime = {
--- End diff --

`preserveSourceTimestamps()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164149992
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
--- End diff --

Add that fields are matched by exact name by default.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164151474
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.FROM -> originFieldName)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as a processing-time 
attribute.
+*
+* E.g. field("myString", Types.STRING).proctime()
+*/
+  def proctime(): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
+  case Some(f) =>
+tableSchema(f) += (DescriptorUtils.PROCTIME -> 
DescriptorUtils.TRUE)
+lastField = None
+}
+this
+  }
+
+  /**
+* Specifies the previously defined field as an event-time attribute.
+*
+* E.g. field("myString", Types.STRING).rowtime(...)
--- End diff --

`field("procTime", Types.SQL_TIMESTAMP).proctime()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164168933
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Rowtime.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.table.api.Types
+import 
org.apache.flink.table.descriptors.NormalizedProperties.{normalizeTimestampExtractor,
 normalizeWatermarkStrategy}
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+
+import scala.collection.mutable
+
+/**
+  * Rowtime descriptor for describing an event time attribute in the 
schema.
+  */
+class Rowtime extends Descriptor {
+
+  private var timestampExtractor: Option[TimestampExtractor] = None
+  private var watermarkStrategy: Option[WatermarkStrategy] = None
+
+  /**
+* Sets a built-in timestamp extractor that converts an existing 
[[Long]] or
+* [[Types.SQL_TIMESTAMP]] field into the rowtime attribute.
+*
+* @param fieldName The field to convert into a rowtime attribute.
+*/
+  def timestampFromField(fieldName: String): Rowtime = {
--- End diff --

`timestampsFromField()`


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164131675
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/CSV.scala
 ---
@@ -139,26 +139,26 @@ class CSV extends EncodingDescriptor("csv") {
   }
 
   /**
-* Internal method for encoding properties conversion.
+* Internal method for format properties conversion.
 */
-  override protected def addEncodingProperties(properties: 
NormalizedProperties): Unit = {
-fieldDelim.foreach(properties.putString("field-delimiter", _))
-lineDelim.foreach(properties.putString("line-delimiter", _))
-properties.putTableSchema("fields", encodingSchema.toIndexedSeq)
-quoteCharacter.foreach(properties.putCharacter("quote-character", _))
-commentPrefix.foreach(properties.putString("comment-prefix", _))
-isIgnoreFirstLine.foreach(properties.putBoolean("ignore-first-line", 
_))
-lenient.foreach(properties.putBoolean("ignore-parse-errors", _))
+  override protected def addFormatProperties(properties: 
NormalizedProperties): Unit = {
+
fieldDelim.foreach(properties.putString(DescriptorUtils.FIELD_DELIMITER, _))
--- End diff --

I would not define the constants globally. Some constants should be global, 
but constants for specific connectors or formats, should go to the respective 
descriptor.
IMO, it would be better to have these keys in `CSV` or the class that 
validates the properties of a certain type.


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164122214
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/NormalizedProperties.scala
 ---
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import java.io.Serializable
+import java.lang.{Boolean => JBoolean, Double => JDouble, Integer => JInt, 
Long => JLong}
+
+import org.apache.commons.codec.binary.Base64
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.descriptors.DescriptorUtils._
+import 
org.apache.flink.table.descriptors.NormalizedProperties.normalizeTableSchema
+import org.apache.flink.table.plan.stats.ColumnStats
+import org.apache.flink.table.sources.tsextractors.{ExistingField, 
StreamRecordTimestamp, TimestampExtractor}
+import org.apache.flink.table.sources.wmstrategies.{AscendingTimestamps, 
BoundedOutOfOrderTimestamps, PreserveWatermarks, WatermarkStrategy}
+import org.apache.flink.table.typeutils.TypeStringUtils
+import org.apache.flink.util.InstantiationUtil
+import org.apache.flink.util.Preconditions.checkNotNull
+
+import scala.collection.mutable
+
+/**
+  * Utility class for having a unified string-based representation of 
Table API related classes
+  * such as [[TableSchema]], [[TypeInformation]], [[WatermarkStrategy]], 
etc.
+  */
+class NormalizedProperties(
--- End diff --

Rename to `TableSourceProperties`? `NormalizedProperties` is quite generic


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164169928
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/TableSourceFactory.scala
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util
+
+/**
+  * A factory to create a [[TableSource]]. This factory is used with 
Java's Service Provider
+  * Interfaces (SPI) for discovering. A factory is called with a set of 
normalized properties that
+  * describe the desired table source. The factory allows for matching to 
the given set of
+  * properties and creating a configured [[TableSource]] accordingly.
+  *
+  * Classes that implement this interface need to be added to the
+  * "META_INF/services/org.apache.flink.table.sources.TableSourceFactory' 
file of a JAR file in
+  * the current classpath to be found.
+  */
+trait TableSourceFactory[T] {
--- End diff --

We might want to add a method that exposes all properties of the connector 
and format that the factory supports. 


---


[GitHub] flink pull request #5240: [FLINK-8240] [table] Create unified interfaces to ...

2018-01-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5240#discussion_r164150350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/Schema.scala
 ---
@@ -69,17 +72,76 @@ class Schema extends Descriptor {
 */
   def field(fieldName: String, fieldType: String): Schema = {
 if (tableSchema.contains(fieldName)) {
-  throw new IllegalArgumentException(s"Duplicate field name 
$fieldName.")
+  throw new ValidationException(s"Duplicate field name $fieldName.")
+}
+
+val fieldProperties = mutable.LinkedHashMap[String, String]()
+fieldProperties += (DescriptorUtils.TYPE -> fieldType)
+
+tableSchema += (fieldName -> fieldProperties)
+
+lastField = Some(fieldName)
+this
+  }
+
+  /**
+* Specifies the origin of the previously defined field. The origin 
field is defined by a
+* connector or format.
+*
+* E.g. field("myString", Types.STRING).from("CSV_MY_STRING")
+*/
+  def from(originFieldName: String): Schema = {
+lastField match {
+  case None => throw new ValidationException("No field defined 
previously. Use field() before.")
--- End diff --

"previously defined"


---


[jira] [Commented] (FLINK-7934) Upgrade Calcite dependency to 1.15

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341343#comment-16341343
 ] 

ASF GitHub Bot commented on FLINK-7934:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5355
  
@suez1224  Great to learn that!


> Upgrade Calcite dependency to 1.15
> --
>
> Key: FLINK-7934
> URL: https://issues.apache.org/jira/browse/FLINK-7934
> Project: Flink
>  Issue Type: Task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Umbrella issue for all related issues for Apache Calcite 1.15 release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5355: [FLINK-7934][Table & SQL API] Upgrade Flink to use Calcit...

2018-01-26 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5355
  
@suez1224  Great to learn that!


---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341313#comment-16341313
 ] 

ASF GitHub Bot commented on FLINK-8240:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5240
  
Hi Timo,

the PR looks good overall. I've made a few suggestion mostly about renaming 
methods or extending docs. I'd also propose to add a `supportedProperties()` 
method to `TableSourceFactory` that can be used to validate whether the factory 
supports all user-provided properties of a connector or format.

What do you think?
Fabian


> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5240: [FLINK-8240] [table] Create unified interfaces to configu...

2018-01-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5240
  
Hi Timo,

the PR looks good overall. I've made a few suggestion mostly about renaming 
methods or extending docs. I'd also propose to add a `supportedProperties()` 
method to `TableSourceFactory` that can be used to validate whether the factory 
supports all user-provided properties of a connector or format.

What do you think?
Fabian


---


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341309#comment-16341309
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5367
  
@twalthr rebased, could you please take another look? Thanks a lot.


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

2018-01-26 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5367
  
@twalthr rebased, could you please take another look? Thanks a lot.


---


[jira] [Comment Edited] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-26 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298
 ] 

Stefan Richter edited comment on FLINK-3089 at 1/26/18 5:13 PM:


[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
Besides, of course we can also drop all outdated entries that we encounter 
during the operations. In general, outdated entries could be detected by an 
attached timestamp (introducing more memory overhead per entry), or we could 
try to correlate timeout with the state version that already exists on every 
entry in this map and currently define the snapshot epochs.


was (Author: srichter):
[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
Besides, of course we can also drop all outdated entries that we encounter 
during the operations. In general, outdated entries cound be detected by an 
attached timestamp (introducing more memory overhead per entry), or we could 
try to correlate timeout with the state version that already exists on every 
entry in this map.

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-26 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298
 ] 

Stefan Richter edited comment on FLINK-3089 at 1/26/18 5:12 PM:


[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
Besides, of course we can also drop all outdated entries that we encounter 
during the operations. In general, outdated entries cound be detected by an 
attached timestamp (introducing more memory overhead per entry), or we could 
try to correlate timeout with the state version that already exists on every 
entry in this map.


was (Author: srichter):
[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
In general, outdated entries cound be detected by an attached timestamp 
(introducing more memory overhead per entry), or we could try to correlate 
timeout with the state version that already exists on every entry in this map.

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration (State TTL)

2018-01-26 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341298#comment-16341298
 ] 

Stefan Richter commented on FLINK-3089:
---

[~phoenixjiangnan] I agree that we could start with a relaxed TTL approach. I 
would not limit the feature to RocksDB, in fact I am also considering to 
implement incremental snapshot for the heap backend and have some approach how 
to this could be done.

For TTL on the heap backend, I also have some ideas how this could work for the 
async variant (see `CopyOnWriteStateTable` which is the default since 1.4 and 
might become the only implementation eventually). For example, one idea is that 
we might go for an approach that works similar to the incremental rehash: doing 
a linear scan over the directory that removes outdated entries over time. This 
scan is performed in very small steps and driven by other operations,  e.g. a 
small fraction of the buckets (maybe just one) is cleaned up as side activity 
for every operation on the map to amortize the cleanup costs. With the linear 
nature,  at least those accesses to the bucket array are also cache conscious. 
In general, outdated entries cound be detected by an attached timestamp 
(introducing more memory overhead per entry), or we could try to correlate 
timeout with the state version that already exists on every entry in this map.

> State API Should Support Data Expiration (State TTL)
> 
>
> Key: FLINK-3089
> URL: https://issues.apache.org/jira/browse/FLINK-3089
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Niels Basjes
>Assignee: Bowen Li
>Priority: Major
>
> In some usecases (webanalytics) there is a need to have a state per visitor 
> on a website (i.e. keyBy(sessionid) ).
> At some point the visitor simply leaves and no longer creates new events (so 
> a special 'end of session' event will not occur).
> The only way to determine that a visitor has left is by choosing a timeout, 
> like "After 30 minutes no events we consider the visitor 'gone'".
> Only after this (chosen) timeout has expired should we discard this state.
> In the Trigger part of Windows we can set a timer and close/discard this kind 
> of information. But that introduces the buffering effect of the window (which 
> in some scenarios is unwanted).
> What I would like is to be able to set a timeout on a specific state which I 
> can update afterwards.
> This makes it possible to create a map function that assigns the right value 
> and that discards the state automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341246#comment-16341246
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user jelmerk commented on the issue:

https://github.com/apache/flink/pull/5296
  
All valid points, thanks for the review!


> Add openstack swift filesystem
> --
>
> Key: FLINK-8432
> URL: https://issues.apache.org/jira/browse/FLINK-8432
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>  Labels: features
>
> At ebay classifieds we started running our infrastructure on top of OpenStack.
> The openstack project comes with its own amazon-s3-like filesystem, known as 
> Swift. It's built for scale and optimized for durability, availability, and 
> concurrency across the entire data set. Swift is ideal for storing 
> unstructured data that can grow without bound.
> We would really like to be able to use it within flink without Hadoop 
> dependencies, as a sink or for storing savepoints etc
> I've prepared a pull request that adds support for it. It wraps the hadoop 
> support for swift in a way that is very similar to the way the s3 connector 
> works.
> You can find out more about the underlying hadoop implementation at 
> [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html]
> Pull request : [https://github.com/apache/flink/pull/5296]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341240#comment-16341240
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user jelmerk commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164158043
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
--- End diff --

Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209


> Add openstack swift filesystem
> --
>
> Key: FLINK-8432
> URL: https://issues.apache.org/jira/browse/FLINK-8432
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>  Labels: features
>
> At ebay classifieds we started running our infrastructure on top of OpenStack.
> The openstack project comes with its own amazon-s3-like filesystem, known as 
> Swift. It's built for scale and optimized for durability, availability, and 
> concurrency across the entire data set. Swift is ideal for storing 
> unstructured data that can grow without bound.
> We would really like to be able to use it within flink without Hadoop 
> dependencies, as a sink o

[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341245#comment-16341245
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user jelmerk commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164158508
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for the Swift file system support.
+ */
+public class HadoopSwiftFileSystemITCase extends TestLogger {
--- End diff --

I did


> Add openstack swift filesystem
> --
>
> Key: FLINK-8432
> URL: https://issues.apache.org/jira/browse/FLINK-8432
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>  Labels: features
>
> At ebay classifieds we started running our infrastructure on top of OpenStack.
> The openstack project comes with its own amazon-s3-like filesystem, known as 
> Swift. It's built for scale and optimized for durability, availability, and 
> concurrency across the entire data set. Swift is ideal for storing 
> unstructured data that can grow without bound.
> We would really like to be able to use it within flink without Hadoop 
> dependencies, as a sink or for storing savepoints etc
> I've prepared a pull request that adds support for it. It wraps the hadoop 
> support for swift in a way that is very similar to the way the s3 connector 
> works.
> You can find out more about the underlying hadoop implementation at 
> [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html]
> Pull request : [https://github.com/apache/flink/pull/5296]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...

2018-01-26 Thread jelmerk
Github user jelmerk commented on the issue:

https://github.com/apache/flink/pull/5296
  
All valid points, thanks for the review!


---


[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341241#comment-16341241
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user jelmerk commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164158160
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
+   
hadoopConfig.set("hadoop.tmp.dir", tmpDir);
+   }
+
+   // add additional config entries from 
the Flink config to the Presto Hadoop config
+   for (String key : flinkConfig.keySet()) 
{
+   if 
(key.startsWith(CONFIG_PREFIX)) {
+   String value = 
flinkConfig.getString(key, null);
+   String newKey = 
"fs.swift." + key.substring(CONFIG_PREFIX.length());
+   
hadoopConfig.set(newKey, flinkConfig.getString(key, null));
--- End diff --

Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209


> Add openstack swift filesystem
> --
>
> Ke

[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...

2018-01-26 Thread jelmerk
Github user jelmerk commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164158508
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for the Swift file system support.
+ */
+public class HadoopSwiftFileSystemITCase extends TestLogger {
--- End diff --

I did


---


[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...

2018-01-26 Thread jelmerk
Github user jelmerk commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164158160
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
+   
hadoopConfig.set("hadoop.tmp.dir", tmpDir);
+   }
+
+   // add additional config entries from 
the Flink config to the Presto Hadoop config
+   for (String key : flinkConfig.keySet()) 
{
+   if 
(key.startsWith(CONFIG_PREFIX)) {
+   String value = 
flinkConfig.getString(key, null);
+   String newKey = 
"fs.swift." + key.substring(CONFIG_PREFIX.length());
+   
hadoopConfig.set(newKey, flinkConfig.getString(key, null));
--- End diff --

Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209


---


[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...

2018-01-26 Thread jelmerk
Github user jelmerk commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164158043
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
--- End diff --

Changed in 718ac46f45bc0b1a275f301255c52cec7ecdd209


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Thomas Weise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341233#comment-16341233
 ] 

Thomas Weise commented on FLINK-8516:
-

Can a PMC please add me as contributor, thanks! CC: [~StephanEwen]

 

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-01-26 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8516:
---
Affects Version/s: 1.5.0
   1.4.0
   1.3.2

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7095) Add proper command line parsing tool to TaskManagerRunner.main

2018-01-26 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341194#comment-16341194
 ] 

mingleizhang commented on FLINK-7095:
-

I will use Commons CLI as it's tool that implements this functionality. 
[~till.rohrmann] If it it okay, please let me know. Thanks ~

> Add proper command line parsing tool to TaskManagerRunner.main
> --
>
> Key: FLINK-7095
> URL: https://issues.apache.org/jira/browse/FLINK-7095
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> We need to add a proper command line parsing tool to the entry point of the 
> {{TaskManagerRunner#main}}. At the moment, we are simply using the 
> {{ParameterTool}} as a temporary solution.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-01-26 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341134#comment-16341134
 ] 

Flavio Pompermaier commented on FLINK-8101:
---

 Hi to all,

I've drafted a first version of the Flink ES 6 connector (that is also 
compatible with ES 5.3+) that I want to discuss with the community.

There are a couple of thing to review (I know I still have to properly update 
the Javadoc...):
 # How to test the connector? It seems that the embedded Node is not supported 
anymore (at least to test the REST part)..am I wrong?
 # Is it possible to make it compatible with the base elasticsearch connector?
 # Are the http request serialized as JSON or as binary? Is it possible to 
force binary conversion?

 

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8450.

Resolution: Fixed

Fixed via 60b7b03f45aeb5a31202b014e486c40116124b30

> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-8450) Make JobMaster/DispatcherGateway#requestJob type safe

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341124#comment-16341124
 ] 

ASF GitHub Bot commented on FLINK-8450:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5309


> Make JobMaster/DispatcherGateway#requestJob type safe
> -
>
> Key: FLINK-8450
> URL: https://issues.apache.org/jira/browse/FLINK-8450
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{RestfulGateway#requestJob}} returns a 
> {{CompletableFuture}}. Since {{AccessExecutionGraph}} 
> is non serializable it could fail if we execute this RPC from a remote 
> system. In order to make it typesafe we should change its signature to 
> {{SerializableExecutionGraph}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5311: [FLINK-8454] [flip6] Remove JobExecutionResultCach...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5311


---


[jira] [Closed] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8449.

Resolution: Fixed

Fixed via 8f9dbeca8bbb8f74bc17410b2f39903ea1f95af1

> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5312: [FLINK-8344][flip6] Add support for HA to RestClus...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5312


---


[GitHub] flink pull request #5346: [FLINK-8490] [mesos] Allow custom docker parameter...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5346


---


[jira] [Closed] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8344.

Resolution: Fixed

Fixed via ac8225fd56f16b1766724aefbd44babbe322d2ac

> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5309: [FLINK-8450] [flip6] Make JobMaster/DispatcherGate...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5309


---


[jira] [Commented] (FLINK-8449) Extend OnCompletionActions to receive AccessExecutionGraph

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341123#comment-16341123
 ] 

ASF GitHub Bot commented on FLINK-8449:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5308


> Extend OnCompletionActions to receive AccessExecutionGraph
> --
>
> Key: FLINK-8449
> URL: https://issues.apache.org/jira/browse/FLINK-8449
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{OnCompletionAction}} currently only receives the {{JobResult}} when the 
> job terminates. For further archiving of the {{ArchivedExecutionGraph}}, we 
> should change this to {{AccessExecutionGraph}}. The {{AccessExecutionGraph}} 
> contains all the information to derive the {{JobResult}} and additionally the 
> information needed for serving information about completed jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8490.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via 6969fe2fa823be7748cee002a32df02fd1cae09f

> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
> Fix For: 1.5.0
>
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphSto...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5310


---


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341126#comment-16341126
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5310


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8454) Remove JobExecutionResultCache

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341125#comment-16341125
 ] 

ASF GitHub Bot commented on FLINK-8454:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5311


> Remove JobExecutionResultCache
> --
>
> Key: FLINK-8454
> URL: https://issues.apache.org/jira/browse/FLINK-8454
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> With the introduction of the {{SerializableExecutionGraphStore}} to the 
> {{Dispatcher}}, it is no longer necessary to store the {{JobResult}} in the 
> {{Dispatcher}}, because all information necessary to derive the {{JobResult}} 
> is contained in the {{SerializableExecutionGraphStore}}. In order to decrease 
> complexity, I propose to remove the {{JobExecutionResultCache}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8453.

Resolution: Fixed

Fixed via 8b817f0f9f0ec55f040b56f2d65c62761eac1ac1

> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8490) Allow custom docker parameters for docker tasks on Mesos

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341127#comment-16341127
 ] 

ASF GitHub Bot commented on FLINK-8490:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5346


> Allow custom docker parameters for docker tasks on Mesos
> 
>
> Key: FLINK-8490
> URL: https://issues.apache.org/jira/browse/FLINK-8490
> Project: Flink
>  Issue Type: Improvement
>  Components: Mesos
>Reporter: Jörg Schad
>Priority: Major
> Fix For: 1.5.0
>
>
> It would be great to pass custom parameters to Mesos when using the Docker 
> Containerizer.
> This could be similar to this spark example: 
> `spark.mesos.executor.docker.parameters privileged=true`
>  
> Originally brought up here: 
> https://stackoverflow.com/questions/48393153/passing-custom-parameters-to-docker-when-running-flink-on-mesos-marathon?noredirect=1#comment83777480_48393153



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5308: [FLINK-8449] [flip6] Extend OnCompletionActions to...

2018-01-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5308


---


[GitHub] flink issue #5312: [FLINK-8344][flip6] Add support for HA to RestClusterClie...

2018-01-26 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5312
  
Thanks man 👍 


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341128#comment-16341128
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5312


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8454) Remove JobExecutionResultCache

2018-01-26 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8454.

Resolution: Fixed

Fixed via a6d7f2d72d47b268c0d6ffa402a59a6349c91d95

> Remove JobExecutionResultCache
> --
>
> Key: FLINK-8454
> URL: https://issues.apache.org/jira/browse/FLINK-8454
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> With the introduction of the {{SerializableExecutionGraphStore}} to the 
> {{Dispatcher}}, it is no longer necessary to store the {{JobResult}} in the 
> {{Dispatcher}}, because all information necessary to derive the {{JobResult}} 
> is contained in the {{SerializableExecutionGraphStore}}. In order to decrease 
> complexity, I propose to remove the {{JobExecutionResultCache}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8101) Elasticsearch 6.x support

2018-01-26 Thread Flavio Pompermaier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Pompermaier reassigned FLINK-8101:
-

Assignee: Flavio Pompermaier  (was: Hai Zhou UTC+8)

> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341116#comment-16341116
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

GitHub user fpompermaier opened a pull request:

https://github.com/apache/flink/pull/5372

[FLINK-8101] [es connector] Elasticsearch 6.x (and 5.3+) Flink connector

## Purpose of the change : implementation of Flink ES connector  (5.3+)
See https://issues.apache.org/jira/browse/FLINK-8101 and 
https://issues.apache.org/jira/browse/FLINK-7386

## Brief change log
- Changed "standard" ES connector structor, mainly because there's 
incompatibility between TransportClient and RestClient and, From ES 5.3+ 
embedded Node environment is not supported anymore. A running test ES cluster 
is needed to properly test the code

## Verifying this change
- Set up an ES cluster and properly change ES_TEST_HOST, ES_TEST_PORT and 
CLUSTER_NAME in the tests (or viceversa: set up a localhost ES cluster 
listening on http port 9200 with cluster name "test-cluster")
## Does this pull request potentially affect one of the following parts:
  -  Flink ES connectors

## Documentation
  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented?  Javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fpompermaier/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5372


commit eaf878de646d90c5b821e0d3b0964fa311f8ac42
Author: Flavio Pompermaier 
Date:   2018-01-26T14:26:52Z

First draft of ES 6 connector




> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8453) Add SerializableExecutionGraphStore to Dispatcher

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341114#comment-16341114
 ] 

ASF GitHub Bot commented on FLINK-8453:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
👍 


> Add SerializableExecutionGraphStore to Dispatcher
> -
>
> Key: FLINK-8453
> URL: https://issues.apache.org/jira/browse/FLINK-8453
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{Dispatcher}} should have a {{SerializableExecutionGraphStore}} which it 
> can use to store completed jobs. This store can then be used to serve 
> historic job requests from the web UI, for example. The default 
> implementation should persist the jobs to disk and evict the in memory 
> instances once they grow to big in order to avoid memory leaks. Additionally, 
> the store should expire elements from disk after a user defined time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5372: [FLINK-8101] [es connector] Elasticsearch 6.x (and...

2018-01-26 Thread fpompermaier
GitHub user fpompermaier opened a pull request:

https://github.com/apache/flink/pull/5372

[FLINK-8101] [es connector] Elasticsearch 6.x (and 5.3+) Flink connector

## Purpose of the change : implementation of Flink ES connector  (5.3+)
See https://issues.apache.org/jira/browse/FLINK-8101 and 
https://issues.apache.org/jira/browse/FLINK-7386

## Brief change log
- Changed "standard" ES connector structor, mainly because there's 
incompatibility between TransportClient and RestClient and, From ES 5.3+ 
embedded Node environment is not supported anymore. A running test ES cluster 
is needed to properly test the code

## Verifying this change
- Set up an ES cluster and properly change ES_TEST_HOST, ES_TEST_PORT and 
CLUSTER_NAME in the tests (or viceversa: set up a localhost ES cluster 
listening on http port 9200 with cluster name "test-cluster")
## Does this pull request potentially affect one of the following parts:
  -  Flink ES connectors

## Documentation
  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented?  Javadocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fpompermaier/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5372


commit eaf878de646d90c5b821e0d3b0964fa311f8ac42
Author: Flavio Pompermaier 
Date:   2018-01-26T14:26:52Z

First draft of ES 6 connector




---


[GitHub] flink issue #5310: [FLINK-8453] [flip6] Add ArchivedExecutionGraphStore to D...

2018-01-26 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5310
  
👍 


---


[jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341113#comment-16341113
 ] 

ASF GitHub Bot commented on FLINK-8344:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5312
  
Thanks man 👍 


> Add support for HA to RestClusterClient
> ---
>
> Key: FLINK-8344
> URL: https://issues.apache.org/jira/browse/FLINK-8344
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in 
> case of HA. We have to add functionality to reconnect to a newly elected 
> leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5296: [FLINK-8432] [filesystem-connector] Add support for opens...

2018-01-26 Thread etiennecarriere
Github user etiennecarriere commented on the issue:

https://github.com/apache/flink/pull/5296
  
We tested this PR with flink 1.4 and the swift offered by French Hoster 
OVH. 

I was fine for checkpoint . 


---


[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1634#comment-1634
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user etiennecarriere commented on the issue:

https://github.com/apache/flink/pull/5296
  
We tested this PR with flink 1.4 and the swift offered by French Hoster 
OVH. 

I was fine for checkpoint . 


> Add openstack swift filesystem
> --
>
> Key: FLINK-8432
> URL: https://issues.apache.org/jira/browse/FLINK-8432
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>  Labels: features
>
> At ebay classifieds we started running our infrastructure on top of OpenStack.
> The openstack project comes with its own amazon-s3-like filesystem, known as 
> Swift. It's built for scale and optimized for durability, availability, and 
> concurrency across the entire data set. Swift is ideal for storing 
> unstructured data that can grow without bound.
> We would really like to be able to use it within flink without Hadoop 
> dependencies, as a sink or for storing savepoints etc
> I've prepared a pull request that adds support for it. It wraps the hadoop 
> support for swift in a way that is very similar to the way the s3 connector 
> works.
> You can find out more about the underlying hadoop implementation at 
> [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html]
> Pull request : [https://github.com/apache/flink/pull/5296]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341099#comment-16341099
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164116058
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
+   
hadoopConfig.set("hadoop.tmp.dir", tmpDir);
+   }
+
+   // add additional config entries from 
the Flink config to the Presto Hadoop config
+   for (String key : flinkConfig.keySet()) 
{
+   if 
(key.startsWith(CONFIG_PREFIX)) {
+   String value = 
flinkConfig.getString(key, null);
+   String newKey = 
"fs.swift." + key.substring(CONFIG_PREFIX.length());
+   
hadoopConfig.set(newKey, flinkConfig.getString(key, null));
--- End diff --

`flinkConfig.getString(key, null)` can be replaced by `value`.


> Add openstack swift filesystem
> --
>
>   

[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164116058
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
+   
hadoopConfig.set("hadoop.tmp.dir", tmpDir);
+   }
+
+   // add additional config entries from 
the Flink config to the Presto Hadoop config
+   for (String key : flinkConfig.keySet()) 
{
+   if 
(key.startsWith(CONFIG_PREFIX)) {
+   String value = 
flinkConfig.getString(key, null);
+   String newKey = 
"fs.swift." + key.substring(CONFIG_PREFIX.length());
+   
hadoopConfig.set(newKey, flinkConfig.getString(key, null));
--- End diff --

`flinkConfig.getString(key, null)` can be replaced by `value`.


---


[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341098#comment-16341098
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164119372
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for the Swift file system support.
+ */
+public class HadoopSwiftFileSystemITCase extends TestLogger {
--- End diff --

I assume yo run these tests locally on OpenStack, right?


> Add openstack swift filesystem
> --
>
> Key: FLINK-8432
> URL: https://issues.apache.org/jira/browse/FLINK-8432
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>  Labels: features
>
> At ebay classifieds we started running our infrastructure on top of OpenStack.
> The openstack project comes with its own amazon-s3-like filesystem, known as 
> Swift. It's built for scale and optimized for durability, availability, and 
> concurrency across the entire data set. Swift is ideal for storing 
> unstructured data that can grow without bound.
> We would really like to be able to use it within flink without Hadoop 
> dependencies, as a sink or for storing savepoints etc
> I've prepared a pull request that adds support for it. It wraps the hadoop 
> support for swift in a way that is very similar to the way the s3 connector 
> works.
> You can find out more about the underlying hadoop implementation at 
> [https://hadoop.apache.org/docs/stable/hadoop-openstack/index.html]
> Pull request : [https://github.com/apache/flink/pull/5296]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164119372
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/test/java/org/apache/flink/fs/openstackhadoop/HadoopSwiftFileSystemITCase.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for the Swift file system support.
+ */
+public class HadoopSwiftFileSystemITCase extends TestLogger {
--- End diff --

I assume yo run these tests locally on OpenStack, right?


---


[GitHub] flink pull request #5296: [FLINK-8432] [filesystem-connector] Add support fo...

2018-01-26 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164116387
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
--- End diff --

Let's use `CoreOptions#TMP_DIRS` instead of using directly 
`System.getProperty("java.io.tmpdir")`. That way we will use the Flink 
configured tmp directory.


---


[jira] [Commented] (FLINK-8432) Add openstack swift filesystem

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341100#comment-16341100
 ] 

ASF GitHub Bot commented on FLINK-8432:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5296#discussion_r164116387
  
--- Diff: 
flink-filesystems/flink-openstack-fs-hadoop/src/main/java/org/apache/flink/fs/openstackhadoop/SwiftFileSystemFactory.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.openstackhadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the Swift file system.
+ */
+public class SwiftFileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(SwiftFileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 
'fs.swift.'. */
+   private static final String CONFIG_PREFIX = "swift.";
+
+   /** Flink's configuration object. */
+   private Configuration flinkConfig;
+
+   /** Hadoop's configuration for the file systems, lazily initialized. */
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   @Override
+   public String getScheme() {
+   return "swift";
+   }
+
+   @Override
+   public void configure(Configuration config) {
+   flinkConfig = config;
+   hadoopConfig = null;
+   }
+
+   @Override
+   public FileSystem create(URI fsUri) throws IOException {
+   LOG.debug("Creating swift file system (backed by a Hadoop 
native swift file system)");
+
+   try {
+   // -- (1) get the loaded Hadoop config (or fall back to 
one loaded from the classpath)
+
+   org.apache.hadoop.conf.Configuration hadoopConfig = 
this.hadoopConfig;
+   if (hadoopConfig == null) {
+   if (flinkConfig != null) {
+   LOG.debug("Loading Hadoop configuration 
for swift native file system");
+   hadoopConfig = 
HadoopUtils.getHadoopConfiguration(flinkConfig);
+
+   // hadoop.tmp.dir needs to be defined 
because it is used as buffer directory
+   if (hadoopConfig.get("hadoop.tmp.dir") 
== null) {
+   String tmpDir = 
System.getProperty("java.io.tmpdir") + "/" + "hadoop-" + 
System.getProperty("user.name");
--- End diff --

Let's use `CoreOptions#TMP_DIRS` instead of using directly 
`System.getProperty("java.io.tmpdir")`. That way we will use the Flink 
configured tmp directory.


> Add openstack swift filesystem
> --
>
> Key: FLINK-8432
> URL: https://issues.apache.org/jira/browse/FLINK-8432
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>  Labels: features
>
> At ebay classifieds we started running our infrastructure on top of OpenStack.
> The openstack project comes with its own amazon-s3-like filesystem, known as 
> Swift. It's built for scale and optimized for durability, availability, and 
> concurrency across the entire data set. Swift is ideal for storing 
> unstructured data that can grow with

[jira] [Commented] (FLINK-8407) Setting the parallelism after a partitioning operation should be forbidden

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341053#comment-16341053
 ] 

ASF GitHub Bot commented on FLINK-8407:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Sure. I'll update the PR to make it more appropriate and thanks for your 
review, @aljoscha.


> Setting the parallelism after a partitioning operation should be forbidden
> --
>
> Key: FLINK-8407
> URL: https://issues.apache.org/jira/browse/FLINK-8407
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Major
>
> Partitioning operations ({{shuffle}}, {{rescale}}, etc.) for a {{DataStream}} 
> create new {{DataStreams}}, which allow the users to set parallelisms for 
> them. However, the {{PartitionTransformations}} in these returned 
> {{DataStreams}} will only add virtual nodes, whose parallelisms could not be 
> specified, in the execution graph. We should forbid users to set the 
> parallelism after a partitioning operation since they won't actually work. 
> Also the corresponding documents should be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5369: [FLINK-8407][DataStream]Setting the parallelism after a p...

2018-01-26 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5369
  
Sure. I'll update the PR to make it more appropriate and thanks for your 
review, @aljoscha.


---


***UNCHECKED*** [jira] [Commented] (FLINK-8492) [FLINK-8492][table] Fix calc cost bug

2018-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341033#comment-16341033
 ] 

ASF GitHub Bot commented on FLINK-8492:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5347
  
Thanks for the fix @hequn8128!
PR is good to merge.


> [FLINK-8492][table] Fix calc cost bug
> -
>
> Key: FLINK-8492
> URL: https://issues.apache.org/jira/browse/FLINK-8492
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Considering the following test, unsupported exception will be thrown due to 
> multi calc existing between correlate and TableFunctionScan.
> {code:java}
> // code placeholder
> @Test
> def testCrossJoinWithMultiFilter(): Unit = {
>   val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
>   val func0 = new TableFunc0
>   val result = t
> .join(func0('c) as('d, 'e))
> .select('c, 'd, 'e)
> .where('e > 10)
> .where('e > 20)
> .select('c, 'd)
> .toAppendStream[Row]
>   result.addSink(new StreamITCase.StringSink[Row])
>   env.execute()
>   val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
>   assertEquals(expected.sorted, StreamITCase.testResults.sorted)
> }
> {code}
> I can see two options to fix this problem:
>  # Adapt calcite OptRule to merge the continuous calc.
>  # Merge multi calc in correlate convert rule.
> I prefer the second one, not only it is easy to implement but also i think 
> with or without an optimize rule should not influence flink functionality. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug

2018-01-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5347
  
Thanks for the fix @hequn8128!
PR is good to merge.


---


  1   2   >