[jira] [Assigned] (FLINK-4303) Add CEP examples

2017-01-24 Thread Alexander Chermenin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Chermenin reassigned FLINK-4303:
--

Assignee: Alexander Chermenin

> Add CEP examples
> 
>
> Key: FLINK-4303
> URL: https://issues.apache.org/jira/browse/FLINK-4303
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.1.0
>Reporter: Timo Walther
>Assignee: Alexander Chermenin
>
> Neither CEP Java nor CEP Scala contain a runnable example. The example on the 
> website is also not runnable without adding some additional code.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #3031: [FLINK-4616] Added functionality through which watermarks...

2017-01-24 Thread MayerRoman
Github user MayerRoman commented on the issue:

https://github.com/apache/flink/pull/3031
  
Thank you Tzu-Li Tai.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5564) User Defined Aggregates

2017-01-24 Thread Shaoxuan Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaoxuan Wang updated FLINK-5564:
-
Description: 
User-defined aggregates would be a great addition to the Table API / SQL.
The current aggregate interface is not well suited for the external users.  
This issue proposes to redesign the aggregate such that we can expose an better 
external UDAGG interface to the users. The detailed design proposal can be 
found here: 
https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit

Motivation:
1. The current aggregate interface is not very concise to the users. One needs 
to know the design details of the intermediate Row buffer before implements an 
Aggregate. Seven functions are needed even for a simple Count aggregate.
2. Another limitation of current aggregate function is that it can only be 
applied on one single column. There are many scenarios which require the 
aggregate function taking multiple columns as the inputs.
3. “Retraction” is not considered and covered in the current Aggregate.
4. It might be very good to have a local/global aggregate query plan 
optimization, which is very promising to optimize UDAGG performance in some 
scenarios.

Proposed Changes:
1. Implement an aggregate dataStream API (Done by 
[FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582])
2. Update all the existing aggregates to use the new aggregate dataStream API
3. Provide a better User-Defined Aggregate interface
4. Add retraction support
5. Add local/global aggregate

  was:
User-defined aggregates would be a great addition to the Table API / SQL.
The current aggregate interface is not well suited for the external users.  
This issue proposes to redesign the aggregate such that we can expose an better 
external UDAGG interface to the users. The detailed design proposal can be 
found here: 
https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit

Motivation:
1. The current aggregate interface is not very concise to the users. One needs 
to know the design details of the intermediate Row buffer before implements an 
Aggregate. Seven functions are needed even for a simple Count aggregate.
2. Another limitation of current aggregate function is that it can only be 
applied on one single column. There are many scenarios which require the 
aggregate function taking multiple columns as the inputs.
3. “Retraction” is not considered and covered in the current Aggregate.
4. It might be very good to have a local/global aggregate query plan 
optimization, which is very promising to optimize UDAGG performance in some 
scenarios.

Proposed Changes:
1. Implement an aggregate dataStream API (Done by 
[Atlassian|http://atlassian.com])
2. Update all the existing aggregates to use the new aggregate dataStream API
3. Provide a better User-Defined Aggregate interface
4. Add retraction support
5. Add local/global aggregate


> User Defined Aggregates
> ---
>
> Key: FLINK-5564
> URL: https://issues.apache.org/jira/browse/FLINK-5564
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> User-defined aggregates would be a great addition to the Table API / SQL.
> The current aggregate interface is not well suited for the external users.  
> This issue proposes to redesign the aggregate such that we can expose an 
> better external UDAGG interface to the users. The detailed design proposal 
> can be found here: 
> https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit
> Motivation:
> 1. The current aggregate interface is not very concise to the users. One 
> needs to know the design details of the intermediate Row buffer before 
> implements an Aggregate. Seven functions are needed even for a simple Count 
> aggregate.
> 2. Another limitation of current aggregate function is that it can only be 
> applied on one single column. There are many scenarios which require the 
> aggregate function taking multiple columns as the inputs.
> 3. “Retraction” is not considered and covered in the current Aggregate.
> 4. It might be very good to have a local/global aggregate query plan 
> optimization, which is very promising to optimize UDAGG performance in some 
> scenarios.
> Proposed Changes:
> 1. Implement an aggregate dataStream API (Done by 
> [FLINK-5582|https://issues.apache.org/jira/browse/FLINK-5582])
> 2. Update all the existing aggregates to use the new aggregate dataStream API
> 3. Provide a better User-Defined Aggregate interface
> 4. Add retraction support
> 5. Add local/global aggregate



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5564) User Defined Aggregates

2017-01-24 Thread Shaoxuan Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shaoxuan Wang updated FLINK-5564:
-
Description: 
User-defined aggregates would be a great addition to the Table API / SQL.
The current aggregate interface is not well suited for the external users.  
This issue proposes to redesign the aggregate such that we can expose an better 
external UDAGG interface to the users. The detailed design proposal can be 
found here: 
https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit

Motivation:
1. The current aggregate interface is not very concise to the users. One needs 
to know the design details of the intermediate Row buffer before implements an 
Aggregate. Seven functions are needed even for a simple Count aggregate.
2. Another limitation of current aggregate function is that it can only be 
applied on one single column. There are many scenarios which require the 
aggregate function taking multiple columns as the inputs.
3. “Retraction” is not considered and covered in the current Aggregate.
4. It might be very good to have a local/global aggregate query plan 
optimization, which is very promising to optimize UDAGG performance in some 
scenarios.

Proposed Changes:
1. Implement an aggregate dataStream API (Done by 
[Atlassian|http://atlassian.com])
2. Update all the existing aggregates to use the new aggregate dataStream API
3. Provide a better User-Defined Aggregate interface
4. Add retraction support
5. Add local/global aggregate

  was:
User-defined aggregates would be a great addition to the Table API / SQL.
The current aggregate interface is not well suited for the external users.  
This issue proposes to redesign the aggregate such that we can expose an better 
external UDAGG interface to the users. The detailed design proposal can be 
found here: 
https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit

Motivation:
1. The current aggregate interface is not very concise to the users. One needs 
to know the design details of the intermediate Row buffer before implements an 
Aggregate. Seven functions are needed even for a simple Count aggregate.
2. Another limitation of current aggregate function is that it can only be 
applied on one single column. There are many scenarios which require the 
aggregate function taking multiple columns as the inputs.
3. “Retraction” is not considered and covered in the current Aggregate.
4. It might be very good to have a local/global aggregate query plan 
optimization, which is very promising to optimize UDAGG performance in some 
scenarios.

Proposed Changes:
1. Implement an aggregate dataStream API
2. Update all the existing aggregates to use the new aggregate dataStream API
3. Provide a better User-Design Aggregate interface
4. Add retraction support
5. Add local/global aggregate


> User Defined Aggregates
> ---
>
> Key: FLINK-5564
> URL: https://issues.apache.org/jira/browse/FLINK-5564
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>
> User-defined aggregates would be a great addition to the Table API / SQL.
> The current aggregate interface is not well suited for the external users.  
> This issue proposes to redesign the aggregate such that we can expose an 
> better external UDAGG interface to the users. The detailed design proposal 
> can be found here: 
> https://docs.google.com/document/d/19JXK8jLIi8IqV9yf7hOs_Oz67yXOypY7Uh5gIOK2r-U/edit
> Motivation:
> 1. The current aggregate interface is not very concise to the users. One 
> needs to know the design details of the intermediate Row buffer before 
> implements an Aggregate. Seven functions are needed even for a simple Count 
> aggregate.
> 2. Another limitation of current aggregate function is that it can only be 
> applied on one single column. There are many scenarios which require the 
> aggregate function taking multiple columns as the inputs.
> 3. “Retraction” is not considered and covered in the current Aggregate.
> 4. It might be very good to have a local/global aggregate query plan 
> optimization, which is very promising to optimize UDAGG performance in some 
> scenarios.
> Proposed Changes:
> 1. Implement an aggregate dataStream API (Done by 
> [Atlassian|http://atlassian.com])
> 2. Update all the existing aggregates to use the new aggregate dataStream API
> 3. Provide a better User-Defined Aggregate interface
> 4. Add retraction support
> 5. Add local/global aggregate



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-5569) Migrate current table registration to in-memory catalog

2017-01-24 Thread zhangjing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing resolved FLINK-5569.
--
Resolution: Duplicate

> Migrate current table registration to in-memory catalog
> ---
>
> Key: FLINK-5569
> URL: https://issues.apache.org/jira/browse/FLINK-5569
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: zhangjing
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5568) Introduce interface for catalog, and provide an in-memory implementation, migrate current table registration to in-memory catalog

2017-01-24 Thread zhangjing (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangjing updated FLINK-5568:
-
Summary: Introduce interface for catalog, and provide an in-memory 
implementation, migrate current table registration to in-memory catalog  (was: 
Introduce interface for catalog, and provide an in-memory implementation)

> Introduce interface for catalog, and provide an in-memory implementation, 
> migrate current table registration to in-memory catalog
> -
>
> Key: FLINK-5568
> URL: https://issues.apache.org/jira/browse/FLINK-5568
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: zhangjing
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837269#comment-15837269
 ] 

ASF GitHub Bot commented on FLINK-5635:
---

GitHub user jgrier opened a pull request:

https://github.com/apache/flink/pull/3205

[FLINK-5635] Improve Docker tooling to make it easier to build images and 
launch Flink via Docker tools

Improvements for Docker on Flink experience.

This PR depends on this other one:  
https://github.com/apache/flink/pull/3204

- Modifying Dockerfile to allow building from local flink-dist as well as 
release URLs
- Making image name configurable so it's easier for user's to use this to 
build an publish their own images
- Logging to stdout rather than just files
- Adding scripts to deploy seamlessly on Docker Swarm without host 
networking
- Updating Docker Compose scripts to work correctly for deploying locally
- Generally parameterizing things so these Docker scripts are more 
generally useful and self-documenting
- Provides options so that you can deploy multiple Flink services with 
unique names on Docker Swarm
- This should all work well with the new Docker "stack" stuff as well.

Example usage:

```
# Build a Docker image from your local flink build
./build.sh --from-local-dist --image-name "dataartisans/flink"
```

```
# Build a Docker image from an official release
./build.sh --from-release --flink-verison 1.2.0 --hadoop-version 2.7 
--scala-version 2.11 --image-name "dataartisans/flink"
```
```
# Run with Docker Compose
docker-compose up -d
docker-compose scale taskmanager=4
```
```
# Run on Docker Swarm
./create-docker-swarm-service.sh --image-name "dataartisans/flink" 
my-flink-service my-flink-port
```
```
# Shut down
./remove-docker-swarm-service.sh --image-name "dataartisans/flink" 
my-flink-service
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jgrier/flink release-1.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3205


commit d380e98b397c770ee4b7eb9e4abb2387e7672d4c
Author: Jamie Grier 
Date:   2017-01-21T00:12:17Z

Improvements for Docker on Flink experience.

Modifying Dockerfile to build from local flink-dist as well as release URLs.
Logging to stdout.
Adding scripts to deploy seamlessly on Docker Swarm.
Updating Docker Compose scripts to work correctly.
Parameterizing things so these Docker scripts are more generally useful.

commit 962290ab66d993cac4c4e233a017feb8a6eb1cc5
Author: Jamie Grier 
Date:   2017-01-21T00:14:27Z

Adding flag to flink-daemon.sh and related scripts so that we can actually 
log to the console -- which is better for Docker environments.

commit 66c2133ede0ed9914baba2fe7b93f1b9d585ede2
Author: Jamie Grier 
Date:   2017-01-25T05:45:23Z

Adding Apache license to new files.




> Improve Docker tooling to make it easier to build images and launch Flink via 
> Docker tools
> --
>
> Key: FLINK-5635
> URL: https://issues.apache.org/jira/browse/FLINK-5635
> Project: Flink
>  Issue Type: Improvement
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> This is a bit of a catch-all ticket for general improvements to the Flink on 
> Docker experience.
> Things to improve:
>   - Make it possible to build a Docker image from your own flink-dist 
> directory as well as official releases.
>   - Make it possible to override the image name so a user can more easily 
> publish these images to their Docker repository
>   - Provide scripts that show how to properly run on Docker Swarm or similar 
> environments with overlay networking (Kubernetes) without using host 
> networking.
>   - Log to stdout rather than to files.
>   - Work properly with docker-compose for local deployment as well as 
> production deployments (Swarm/k8s)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3205: [FLINK-5635] Improve Docker tooling to make it eas...

2017-01-24 Thread jgrier
GitHub user jgrier opened a pull request:

https://github.com/apache/flink/pull/3205

[FLINK-5635] Improve Docker tooling to make it easier to build images and 
launch Flink via Docker tools

Improvements for Docker on Flink experience.

This PR depends on this other one:  
https://github.com/apache/flink/pull/3204

- Modifying Dockerfile to allow building from local flink-dist as well as 
release URLs
- Making image name configurable so it's easier for user's to use this to 
build an publish their own images
- Logging to stdout rather than just files
- Adding scripts to deploy seamlessly on Docker Swarm without host 
networking
- Updating Docker Compose scripts to work correctly for deploying locally
- Generally parameterizing things so these Docker scripts are more 
generally useful and self-documenting
- Provides options so that you can deploy multiple Flink services with 
unique names on Docker Swarm
- This should all work well with the new Docker "stack" stuff as well.

Example usage:

```
# Build a Docker image from your local flink build
./build.sh --from-local-dist --image-name "dataartisans/flink"
```

```
# Build a Docker image from an official release
./build.sh --from-release --flink-verison 1.2.0 --hadoop-version 2.7 
--scala-version 2.11 --image-name "dataartisans/flink"
```
```
# Run with Docker Compose
docker-compose up -d
docker-compose scale taskmanager=4
```
```
# Run on Docker Swarm
./create-docker-swarm-service.sh --image-name "dataartisans/flink" 
my-flink-service my-flink-port
```
```
# Shut down
./remove-docker-swarm-service.sh --image-name "dataartisans/flink" 
my-flink-service
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jgrier/flink release-1.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3205.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3205


commit d380e98b397c770ee4b7eb9e4abb2387e7672d4c
Author: Jamie Grier 
Date:   2017-01-21T00:12:17Z

Improvements for Docker on Flink experience.

Modifying Dockerfile to build from local flink-dist as well as release URLs.
Logging to stdout.
Adding scripts to deploy seamlessly on Docker Swarm.
Updating Docker Compose scripts to work correctly.
Parameterizing things so these Docker scripts are more generally useful.

commit 962290ab66d993cac4c4e233a017feb8a6eb1cc5
Author: Jamie Grier 
Date:   2017-01-21T00:14:27Z

Adding flag to flink-daemon.sh and related scripts so that we can actually 
log to the console -- which is better for Docker environments.

commit 66c2133ede0ed9914baba2fe7b93f1b9d585ede2
Author: Jamie Grier 
Date:   2017-01-25T05:45:23Z

Adding Apache license to new files.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5635) Improve Docker tooling to make it easier to build images and launch Flink via Docker tools

2017-01-24 Thread Jamie Grier (JIRA)
Jamie Grier created FLINK-5635:
--

 Summary: Improve Docker tooling to make it easier to build images 
and launch Flink via Docker tools
 Key: FLINK-5635
 URL: https://issues.apache.org/jira/browse/FLINK-5635
 Project: Flink
  Issue Type: Improvement
  Components: Docker
Affects Versions: 1.2.0
Reporter: Jamie Grier
 Fix For: 1.2.0


This is a bit of a catch-all ticket for general improvements to the Flink on 
Docker experience.

Things to improve:
  - Make it possible to build a Docker image from your own flink-dist directory 
as well as official releases.
  - Make it possible to override the image name so a user can more easily 
publish these images to their Docker repository
  - Provide scripts that show how to properly run on Docker Swarm or similar 
environments with overlay networking (Kubernetes) without using host networking.
  - Log to stdout rather than to files.
  - Work properly with docker-compose for local deployment as well as 
production deployments (Swarm/k8s)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-01-24 Thread Jamie Grier (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jamie Grier updated FLINK-5634:
---
Description: 
Flink always redirects stdout to a file.  While often convenient this isn't 
always what people want.  The most obvious case of this is a Docker deployment.

It should be possible to have Flink log to stdout.

Here is a PR for this:  https://github.com/apache/flink/pull/3204


  was:
Flink always redirects stdout to a file.  While often convenient this isn't 
always what people want.  The most obvious case of this is a Docker deployment.

It should be possible to have Flink log to stdout.


> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.
> Here is a PR for this:  https://github.com/apache/flink/pull/3204



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5634) Flink should not always redirect stdout to a file.

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837250#comment-15837250
 ] 

ASF GitHub Bot commented on FLINK-5634:
---

GitHub user jgrier opened a pull request:

https://github.com/apache/flink/pull/3204

[FLINK-5634] Flink should not always redirect stdout to a file. 

Adding flag to flink-daemon.sh and related scripts so that we can actually 
log to the console -- which is better for Docker environments.  Also modifying 
Docker entrypoint to use this.

There is a follow on PR with many more changes to the Docker scripts and 
tooling based on this one.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jgrier/flink no-stdout-redirect

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3204.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3204


commit 5f6a9671816a16b810990a10310f994898e9ba41
Author: Jamie Grier 
Date:   2017-01-21T00:14:27Z

Adding flag to flink-daemon.sh and related scripts so that we can actually 
log to the console -- which is better for Docker environments.




> Flink should not always redirect stdout to a file.
> --
>
> Key: FLINK-5634
> URL: https://issues.apache.org/jira/browse/FLINK-5634
> Project: Flink
>  Issue Type: Bug
>  Components: Docker
>Affects Versions: 1.2.0
>Reporter: Jamie Grier
> Fix For: 1.2.0
>
>
> Flink always redirects stdout to a file.  While often convenient this isn't 
> always what people want.  The most obvious case of this is a Docker 
> deployment.
> It should be possible to have Flink log to stdout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-01-24 Thread Giuliano Caliari (JIRA)
Giuliano Caliari created FLINK-5633:
---

 Summary: ClassCastException: X cannot be cast to X when 
re-submitting a job.
 Key: FLINK-5633
 URL: https://issues.apache.org/jira/browse/FLINK-5633
 Project: Flink
  Issue Type: Bug
  Components: Job-Submission, YARN
Affects Versions: 1.1.4
Reporter: Giuliano Caliari
Priority: Minor


I’m running a job on my local cluster and the first time I submit the job 
everything works but whenever I cancel and re-submit the same job it fails with:

{quote}
org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.

at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)

at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)

at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)

at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)

at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)

at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)

at 
au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)

at 
au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)

at scala.Function0$class.apply$mcV$sp(Function0.scala:34)

at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.App$$anonfun$main$1.apply(App.scala:76)

at scala.collection.immutable.List.foreach(List.scala:381)

at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)

at scala.App$class.main(App.scala:76)

at 
au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)

at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)

at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)

at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)

at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)

at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)

at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)

at 
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)

at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)

at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Could not forward element to next 
operator

at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)

at 

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97710068
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

Yes. We can use `TypeExtractor.getForClass(Class)` to extract the 
corresponding TypeInformation for internal use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837203#comment-15837203
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97710068
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

Yes. We can use `TypeExtractor.getForClass(Class)` to extract the 
corresponding TypeInformation for internal use.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837201#comment-15837201
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709590
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -19,99 +19,113 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
  */
-public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
private String tableName;
-   private TypeInformation[] fieldTypeInfos;
-   private String[] fieldNames;
-   private transient Table table;
-   private transient Scan scan;
private transient Connection conn;
-   private ResultScanner resultScanner = null;
-
-   private byte[] lastRow;
-   private int scannedRows;
-   private boolean endReached = false;
-   private org.apache.hadoop.conf.Configuration conf;
-   private static final String COLON = ":";
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
 
-   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
-   this.conf = conf;
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
-   this.fieldNames = fieldNames;
-   this.fieldTypeInfos = fieldTypeInfos;
+   this.conf = conf;
+   this.schema = schema;
}
 
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBaseConfiguration");
connectToTable();
if(table != null) {
-   scan = createScanner();
+   scan = getScanner();
}
}
 
-   private Scan createScanner() {
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
Scan scan = new Scan();
-   for(String field : fieldNames) {
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
// select only the fields in the 'selectedFields'
-   String[] famCol = field.split(COLON);
- 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837197#comment-15837197
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709503
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
--- End diff --

Fine. Will do it.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837195#comment-15837195
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709482
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
--- End diff --

Ok. Got it.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837200#comment-15837200
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709565
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+ 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837198#comment-15837198
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709518
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
}
 
@Override
public TypeInformation getReturnType() {
-   return new RowTypeInfo(colTypes);
-   }
-
-   @Override
-   public DataSet getDataSet(ExecutionEnvironment execEnv) {
-   return execEnv.createInput(new 
HBaseTableSourceInputFormat(conf, tableName, colNames, colTypes), 
getReturnType());
-   }
+   // split the fieldNames
+   Map famMap = schema.getFamilyMap();
 
-   @Override
-   public ProjectableTableSource projectFields(int[] fields) {
-   String[] newColNames = new String[fields.length];
-   TypeInformation[] newColTypes =  new 
TypeInformation[fields.length];
+   List qualNames = new ArrayList();
--- End diff --

I will look into this.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709565
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   byte[] value = 
res.getValue(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst()));
+   if(value != null) {
+   values.add(schema.deserialize(value, 

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709590
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -19,99 +19,113 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
  */
-public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
private String tableName;
-   private TypeInformation[] fieldTypeInfos;
-   private String[] fieldNames;
-   private transient Table table;
-   private transient Scan scan;
private transient Connection conn;
-   private ResultScanner resultScanner = null;
-
-   private byte[] lastRow;
-   private int scannedRows;
-   private boolean endReached = false;
-   private org.apache.hadoop.conf.Configuration conf;
-   private static final String COLON = ":";
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
 
-   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
-   this.conf = conf;
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
-   this.fieldNames = fieldNames;
-   this.fieldTypeInfos = fieldTypeInfos;
+   this.conf = conf;
+   this.schema = schema;
}
 
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBaseConfiguration");
connectToTable();
if(table != null) {
-   scan = createScanner();
+   scan = getScanner();
}
}
 
-   private Scan createScanner() {
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
Scan scan = new Scan();
-   for(String field : fieldNames) {
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
// select only the fields in the 'selectedFields'
-   String[] famCol = field.split(COLON);
-   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+  

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709469
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

Ok. Let me check that. So if we pass Class there we could wrap it with the 
corresponding TypeInformation for our internal usage?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837194#comment-15837194
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709469
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

Ok. Let me check that. So if we pass Class there we could wrap it with the 
corresponding TypeInformation for our internal usage?


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709482
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
--- End diff --

Ok. Got it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709518
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
}
 
@Override
public TypeInformation getReturnType() {
-   return new RowTypeInfo(colTypes);
-   }
-
-   @Override
-   public DataSet getDataSet(ExecutionEnvironment execEnv) {
-   return execEnv.createInput(new 
HBaseTableSourceInputFormat(conf, tableName, colNames, colTypes), 
getReturnType());
-   }
+   // split the fieldNames
+   Map famMap = schema.getFamilyMap();
 
-   @Override
-   public ProjectableTableSource projectFields(int[] fields) {
-   String[] newColNames = new String[fields.length];
-   TypeInformation[] newColTypes =  new 
TypeInformation[fields.length];
+   List qualNames = new ArrayList();
--- End diff --

I will look into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709503
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
--- End diff --

Fine. Will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97709488
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+   }
+   list.add(new Pair(qualifier, type));
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4185) Reflecting rename from Tachyon to Alluxio

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837129#comment-15837129
 ] 

ASF GitHub Bot commented on FLINK-4185:
---

Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/
  
By the way, logos needs updated on the homepage

URL: https://flink.apache.org/img/features/ecosystem_logos.png

![img][1]

  [1]: https://flink.apache.org/img/features/ecosystem_logos.png


> Reflecting rename from Tachyon to Alluxio
> -
>
> Key: FLINK-4185
> URL: https://issues.apache.org/jira/browse/FLINK-4185
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.0.3
>Reporter: Jiri Simsa
>Priority: Trivial
> Fix For: 1.1.0
>
>
> The Tachyon project has been renamed to Alluxio earlier this year. The goal 
> of this issue is to reflect this in the Flink documentation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5632) Typo in StreamGraph

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837126#comment-15837126
 ] 

ASF GitHub Bot commented on FLINK-5632:
---

GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/3203

[FLINK-5632] [streaming] Typo in StreamGraph



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-5632

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3203


commit 0301cd26234514a8af8214d65c537d07575285d2
Author: Tony Wei 
Date:   2017-01-25T03:36:54Z

[FLINK-5632] Typo in StreamGraph




> Typo in StreamGraph
> ---
>
> Key: FLINK-5632
> URL: https://issues.apache.org/jira/browse/FLINK-5632
> Project: Flink
>  Issue Type: Improvement
>  Components: Java API
>Reporter: Wei-Che Wei
>Assignee: Wei-Che Wei
>Priority: Trivial
>
> Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2222: [FLINK-4185] Reflecting rename from Tachyon to Alluxio.

2017-01-24 Thread cricket007
Github user cricket007 commented on the issue:

https://github.com/apache/flink/pull/
  
By the way, logos needs updated on the homepage

URL: https://flink.apache.org/img/features/ecosystem_logos.png

![img][1]

  [1]: https://flink.apache.org/img/features/ecosystem_logos.png


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3203: [FLINK-5632] [streaming] Typo in StreamGraph

2017-01-24 Thread tony810430
GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/3203

[FLINK-5632] [streaming] Typo in StreamGraph



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-5632

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3203


commit 0301cd26234514a8af8214d65c537d07575285d2
Author: Tony Wei 
Date:   2017-01-25T03:36:54Z

[FLINK-5632] Typo in StreamGraph




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5632) Typo in StreamGraph

2017-01-24 Thread Wei-Che Wei (JIRA)
Wei-Che Wei created FLINK-5632:
--

 Summary: Typo in StreamGraph
 Key: FLINK-5632
 URL: https://issues.apache.org/jira/browse/FLINK-5632
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Reporter: Wei-Che Wei
Assignee: Wei-Che Wei
Priority: Trivial


Typo in StreamGraph field : virtuaPartitionNodes -> virtualPartitionNodes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97702519
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   byte[] value = 
res.getValue(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst()));
+   if(value != null) {
+   values.add(schema.deserialize(value, 

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837107#comment-15837107
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97702519
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837102#comment-15837102
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97702388
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97702388
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   byte[] value = 
res.getValue(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst()));
+   if(value != null) {
+   values.add(schema.deserialize(value, 

[jira] [Assigned] (FLINK-4499) Introduce findbugs maven plugin

2017-01-24 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu reassigned FLINK-4499:
-

Assignee: Suneel Marthi

> Introduce findbugs maven plugin
> ---
>
> Key: FLINK-4499
> URL: https://issues.apache.org/jira/browse/FLINK-4499
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: Suneel Marthi
>
> As suggested by Stephan in FLINK-4482, this issue is to add 
> findbugs-maven-plugin into the build process so that we can detect lack of 
> proper locking and other defects automatically.
> We can begin with small set of rules.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837092#comment-15837092
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97701594
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -19,99 +19,113 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
  */
-public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
private String tableName;
-   private TypeInformation[] fieldTypeInfos;
-   private String[] fieldNames;
-   private transient Table table;
-   private transient Scan scan;
private transient Connection conn;
-   private ResultScanner resultScanner = null;
-
-   private byte[] lastRow;
-   private int scannedRows;
-   private boolean endReached = false;
-   private org.apache.hadoop.conf.Configuration conf;
-   private static final String COLON = ":";
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
 
-   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
-   this.conf = conf;
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
-   this.fieldNames = fieldNames;
-   this.fieldTypeInfos = fieldTypeInfos;
+   this.conf = conf;
+   this.schema = schema;
}
 
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBaseConfiguration");
connectToTable();
if(table != null) {
-   scan = createScanner();
+   scan = getScanner();
}
}
 
-   private Scan createScanner() {
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
Scan scan = new Scan();
-   for(String field : fieldNames) {
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
// select only the fields in the 'selectedFields'
-   String[] famCol = field.split(COLON);
-   

[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837094#comment-15837094
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97696231
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

addColumns -> addColumn ?

I'm not sure whether we should use `TypeInformation` or `Class` here. 
Because TypeInformation indicates that we use Flink Serialization framework to 
serialize primitives. But it doesn't happen here.  Maybe `Class` is a simpler 
and better choice ? 


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837097#comment-15837097
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97698410
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+   }
+   list.add(new Pair(qualifier, type));
--- End diff --

add a `<>` to avoid warning. `list.add(new Pair<>(qualifier, type));`


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837093#comment-15837093
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97696842
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
--- End diff --

I think we should throw an exception here to indicate users this class type 
is not supported, please use byte[].class instead. Otherwise, the user will 
think that this type can work, and select the value without deserialization, 
and get the unexpected result.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837096#comment-15837096
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97699488
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
--- End diff --

I would like to move these code into `getReturnType()`, the schema may 
change after construction of `HBaseTableSource`.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4534) Lack of synchronization in BucketingSink#restoreState()

2017-01-24 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15506583#comment-15506583
 ] 

Ted Yu edited comment on FLINK-4534 at 1/25/17 3:10 AM:


Feel free to work on this.

Thanks, Liwei.


was (Author: yuzhih...@gmail.com):
Feel free to work on this.

Thanks, Liwei .

> Lack of synchronization in BucketingSink#restoreState()
> ---
>
> Key: FLINK-4534
> URL: https://issues.apache.org/jira/browse/FLINK-4534
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> Iteration over state.bucketStates is protected by synchronization in other 
> methods, except for the following in restoreState():
> {code}
> for (BucketState bucketState : state.bucketStates.values()) {
> {code}
> and following in close():
> {code}
> for (Map.Entry entry : 
> state.bucketStates.entrySet()) {
>   closeCurrentPartFile(entry.getValue());
> {code}
> w.r.t. bucketState.pendingFilesPerCheckpoint , there is similar issue 
> starting line 752:
> {code}
>   Set pastCheckpointIds = 
> bucketState.pendingFilesPerCheckpoint.keySet();
>   LOG.debug("Moving pending files to final location on restore.");
>   for (Long pastCheckpointId : pastCheckpointIds) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837095#comment-15837095
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97700808
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
}
 
@Override
public TypeInformation getReturnType() {
-   return new RowTypeInfo(colTypes);
-   }
-
-   @Override
-   public DataSet getDataSet(ExecutionEnvironment execEnv) {
-   return execEnv.createInput(new 
HBaseTableSourceInputFormat(conf, tableName, colNames, colTypes), 
getReturnType());
-   }
+   // split the fieldNames
+   Map famMap = schema.getFamilyMap();
 
-   @Override
-   public ProjectableTableSource projectFields(int[] fields) {
-   String[] newColNames = new String[fields.length];
-   TypeInformation[] newColTypes =  new 
TypeInformation[fields.length];
+   List qualNames = new ArrayList();
--- End diff --

We can move the code of creating `typeInfos` into `HBaseTableSchema` , 
named `TypeInformation[] getColumnTypes()`. 

And also the code of creating family names called `String[] 
getFamilyNames()`.

This can reduce the redundant code.


> Add HBaseTableSource
> 
>
> Key: FLINK-2168
> URL: https://issues.apache.org/jira/browse/FLINK-2168
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 0.9
>Reporter: Fabian Hueske
>Assignee: ramkrishna.s.vasudevan
>Priority: Minor
>  Labels: starter
>
> Add a {{HBaseTableSource}} to read data from a HBase table. The 
> {{HBaseTableSource}} should implement the {{ProjectableTableSource}} 
> (FLINK-3848) and {{FilterableTableSource}} (FLINK-3849) interfaces.
> The implementation can be based on Flink's {{TableInputFormat}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)



[jira] [Commented] (FLINK-2168) Add HBaseTableSource

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837091#comment-15837091
 ] 

ASF GitHub Bot commented on FLINK-2168:
---

Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97701834
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97701834
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
+ */
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
+   private String tableName;
+   private transient Connection conn;
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
+
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
+   this.tableName = tableName;
+   this.conf = conf;
+   this.schema = schema;
+   }
+
+   @Override
+   public void configure(Configuration parameters) {
+   LOG.info("Initializing HBaseConfiguration");
+   connectToTable();
+   if(table != null) {
+   scan = getScanner();
+   }
+   }
+
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
+   Scan scan = new Scan();
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
+   // select only the fields in the 'selectedFields'
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   scan.addColumn(Bytes.toBytes(family), 
Bytes.toBytes(pair.getFirst()));
+   }
+   }
+   return scan;
+   }
+
+   @Override
+   public String getTableName() {
+   return tableName;
+   }
+
+   @Override
+   protected Row mapResultToTuple(Result res) {
+   List values = new ArrayList();
+   int i = 0;
+   Map familyMap = schema.getFamilyMap();
+   Row[] rows = new Row[familyMap.size()];
+   for(String family : familyMap.keySet()) {
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+   byte[] value = 
res.getValue(Bytes.toBytes(family), Bytes.toBytes(pair.getFirst()));
+   if(value != null) {
+   values.add(schema.deserialize(value, 

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97701594
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSourceInputFormat.java
 ---
@@ -19,99 +19,113 @@
 package org.apache.flink.addons.hbase;
 
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
-import org.apache.flink.api.common.io.RichInputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.types.Row;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.List;
+import java.util.Map;
 
 /**
  * {@link InputFormat} subclass that wraps the access for HTables. Returns 
the result as {@link Row}
  */
-public class HBaseTableSourceInputFormat extends RichInputFormat implements ResultTypeQueryable {
+public class HBaseTableSourceInputFormat extends TableInputFormat 
implements ResultTypeQueryable {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(HBaseTableSourceInputFormat.class);
private String tableName;
-   private TypeInformation[] fieldTypeInfos;
-   private String[] fieldNames;
-   private transient Table table;
-   private transient Scan scan;
private transient Connection conn;
-   private ResultScanner resultScanner = null;
-
-   private byte[] lastRow;
-   private int scannedRows;
-   private boolean endReached = false;
-   private org.apache.hadoop.conf.Configuration conf;
-   private static final String COLON = ":";
+   private transient org.apache.hadoop.conf.Configuration conf;
+   private HBaseTableSchema schema;
 
-   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, String[] fieldNames, TypeInformation[] fieldTypeInfos) {
-   this.conf = conf;
+   public HBaseTableSourceInputFormat(org.apache.hadoop.conf.Configuration 
conf, String tableName, HBaseTableSchema schema) {
this.tableName = tableName;
-   this.fieldNames = fieldNames;
-   this.fieldTypeInfos = fieldTypeInfos;
+   this.conf = conf;
+   this.schema = schema;
}
 
@Override
public void configure(Configuration parameters) {
LOG.info("Initializing HBaseConfiguration");
connectToTable();
if(table != null) {
-   scan = createScanner();
+   scan = getScanner();
}
}
 
-   private Scan createScanner() {
+   @Override
+   protected Scan getScanner() {
+   // TODO : Pass 'rowkey'. For this we need FilterableTableSource
Scan scan = new Scan();
-   for(String field : fieldNames) {
+   Map familyMap = schema.getFamilyMap();
+   for(String family : familyMap.keySet()) {
// select only the fields in the 'selectedFields'
-   String[] famCol = field.split(COLON);
-   scan.addColumn(Bytes.toBytes(famCol[0]), 
Bytes.toBytes(famCol[1]));
+   List colDetails = familyMap.get(family);
+   for(Pair pair : colDetails) 
{
+ 

[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97698410
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
+   Preconditions.checkNotNull(family, "family name");
+   Preconditions.checkNotNull(family, "qualifier name");
+   Preconditions.checkNotNull(type, "type name");
+   List list = this.familyMap.get(family);
+   if (list == null) {
+   list = new ArrayList();
+   }
+   boolean found = false;
+   for(Class classType : CLASS_TYPES) {
+   if(classType == type.getTypeClass()) {
+   found = true;
+   break;
+   }
+   }
+   if(!found) {
+   // by default it will be byte[] type only
+   type = BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO;
+   }
+   list.add(new Pair(qualifier, type));
--- End diff --

add a `<>` to avoid warning. `list.add(new Pair<>(qualifier, type));`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97696231
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSchema.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.addons.hbase;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Date;
+
+/**
+ * Helps to specify an HBase Table's schema
+ */
+public class HBaseTableSchema implements Serializable {
+
+   // A Map with key as column family.
+   private final Map familyMap =
+   new HashMap();
+
+   // Allowed types. This may change.
+   // TODO : Check if the Date type should be the one in java.util or the 
one in java.sql
+   private static Class[] CLASS_TYPES = {
+   Integer.class, Short.class, Float.class, Long.class, 
String.class, Byte.class, Boolean.class, Double.class, BigInteger.class, 
BigDecimal.class, Date.class
+   };
+   private static byte[] EMPTY_BYTE_ARRAY = new byte[0];
+   public void addColumns(String family, String qualifier, 
TypeInformation type) {
--- End diff --

addColumns -> addColumn ?

I'm not sure whether we should use `TypeInformation` or `Class` here. 
Because TypeInformation indicates that we use Flink Serialization framework to 
serialize primitives. But it doesn't happen here.  Maybe `Class` is a simpler 
and better choice ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3149: FLINK-2168 Add HBaseTableSource

2017-01-24 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3149#discussion_r97699488
  
--- Diff: 
flink-connectors/flink-hbase/src/main/java/org/apache/flink/addons/hbase/HBaseTableSource.java
 ---
@@ -22,54 +22,63 @@
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sources.BatchTableSource;
-import org.apache.flink.table.sources.ProjectableTableSource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Creates a table source that helps to scan data from an hbase table
  *
  * Note : the colNames are specified along with a familyName and they are 
seperated by a ':'
  * For eg, cf1:q1 - where cf1 is the familyName and q1 is the qualifier 
name
  */
-public class HBaseTableSource implements BatchTableSource, 
ProjectableTableSource {
+// TODO : Implement ProjectableTableSource?
+public class HBaseTableSource implements BatchTableSource {
 
private Configuration conf;
private String tableName;
-   private byte[] rowKey;
-   private String[] colNames;
-   private TypeInformation[] colTypes;
+   private HBaseTableSchema schema;
+   private String[] famNames;
 
-   public HBaseTableSource(Configuration conf, String tableName, byte[] 
rowKey, String[] colNames,
-   TypeInformation[] 
colTypes) {
+   public HBaseTableSource(Configuration conf, String tableName, 
HBaseTableSchema schema) {
this.conf = conf;
this.tableName = Preconditions.checkNotNull(tableName, "Table  
name");
-   this.rowKey = Preconditions.checkNotNull(rowKey, "Rowkey");
-   this.colNames = Preconditions.checkNotNull(colNames, "Field 
names");
-   this.colTypes = Preconditions.checkNotNull(colTypes, "Field 
types");
+   this.schema = Preconditions.checkNotNull(schema, "Schema");
+   Map familyMap = schema.getFamilyMap();
+   famNames = familyMap.keySet().toArray(new 
String[familyMap.size()]);
--- End diff --

I would like to move these code into `getReturnType()`, the schema may 
change after construction of `HBaseTableSource`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5631) [yarn] Support downloading additional jars from non-HDFS paths

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15837036#comment-15837036
 ] 

ASF GitHub Bot commented on FLINK-5631:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3202

[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS…

This PR enables Flink to support downloading additional jars from non-HDFS 
paths (e.g., S3, viewfs). More details are available in the jira.

There are two changes in this PR:

  * Refactoring the code for localization into a common function
  * Use `Path.getFileSystem()` to obtain the FileSystem object instead of 
always using the default FileSystem constructed using YARN config.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3202


commit f7d24c5a0559ca71cd49962c9dc0b4ab81997783
Author: Haohui Mai 
Date:   2017-01-25T02:00:08Z

[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths.




> [yarn] Support downloading additional jars from non-HDFS paths
> --
>
> Key: FLINK-5631
> URL: https://issues.apache.org/jira/browse/FLINK-5631
> Project: Flink
>  Issue Type: Bug
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> Currently the {{YarnResourceManager}} and {{YarnApplicationMasterRunner}} 
> always register the additional jars using the YARN filesystem object. This is 
> problematic as the paths might require another filesystem.
> To support localizing from non-HDFS paths (e.g., s3, http or viewfs), the 
> cleaner approach is to get the filesystem object from the path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #3202: [FLINK-5631] [yarn] Support downloading additional...

2017-01-24 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3202

[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS…

This PR enables Flink to support downloading additional jars from non-HDFS 
paths (e.g., S3, viewfs). More details are available in the jira.

There are two changes in this PR:

  * Refactoring the code for localization into a common function
  * Use `Path.getFileSystem()` to obtain the FileSystem object instead of 
always using the default FileSystem constructed using YARN config.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5631

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3202.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3202


commit f7d24c5a0559ca71cd49962c9dc0b4ab81997783
Author: Haohui Mai 
Date:   2017-01-25T02:00:08Z

[FLINK-5631] [yarn] Support downloading additional jars from non-HDFS paths.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5537) Config option: SSL

2017-01-24 Thread Vijay Srinivasaraghavan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836984#comment-15836984
 ] 

Vijay Srinivasaraghavan commented on FLINK-5537:


PR created for the issue - 
https://github.com/mesosphere/dcos-flink-service/pull/15

> Config option: SSL
> --
>
> Key: FLINK-5537
> URL: https://issues.apache.org/jira/browse/FLINK-5537
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5631) [yarn] Support downloading additional jars from non-HDFS paths

2017-01-24 Thread Haohui Mai (JIRA)
Haohui Mai created FLINK-5631:
-

 Summary: [yarn] Support downloading additional jars from non-HDFS 
paths
 Key: FLINK-5631
 URL: https://issues.apache.org/jira/browse/FLINK-5631
 Project: Flink
  Issue Type: Bug
Reporter: Haohui Mai
Assignee: Haohui Mai


Currently the {{YarnResourceManager}} and {{YarnApplicationMasterRunner}} 
always register the additional jars using the YARN filesystem object. This is 
problematic as the paths might require another filesystem.

To support localizing from non-HDFS paths (e.g., s3, http or viewfs), the 
cleaner approach is to get the filesystem object from the path.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836734#comment-15836734
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user ch33hau commented on the issue:

https://github.com/apache/flink/pull/2761
  
@fhueske it is ok, and thanks for the review =)
I couldn't fix them now and I might start working on your comments in 1 day.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2761: [FLINK-3551] [examples] Sync Scala Streaming Examples

2017-01-24 Thread ch33hau
Github user ch33hau commented on the issue:

https://github.com/apache/flink/pull/2761
  
@fhueske it is ok, and thanks for the review =)
I couldn't fix them now and I might start working on your comments in 1 day.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97205993
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
--- End diff --

Scaladocs are commented with 
```
/**
 *
 */
```
instead of 
```
/**
  *
  */
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97205961
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
--- End diff --

The use of HTML is discouraged in Scaladoc. Instead use wiki markup 
(markdown) whenever possible.
See [Scaladoc guidelines](http://docs.scala-lang.org/style/scaladoc.html).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836689#comment-15836689
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206069
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
--- End diff --

I find it helpful to add the type of the DataStream.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97658391
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * An example of grouped stream windowing into sliding time windows.
+  * This example uses [[RichParallelSourceFunction]] to generate a list of 
key-value pair.
+  */
+object GroupedProcessingTimeWindowExample {
+
+  def main(args: Array[String]): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(4)
+
+val stream = env.addSource(new RichParallelSourceFunction[(Long, 
Long)]() {
--- End diff --

Move the source function to a separate class? It "hides" the important 
aspects of the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97657755
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
+
+override def run(ctx: SourceContext[Int]) = {
+  Thread.sleep(15)
+  while (counter < 50) {
+ctx.collect(getNewData)
+  }
+}
+
+def getNewData = {
+  

[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206069
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
--- End diff --

I find it helpful to add the type of the DataStream.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207859
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
+  // selecting English tweets and splitting to (word, 1)
+  .flatMap { value: String =>
+  val jsonParser = new ObjectMapper()
--- End diff --

Creating a new `ObjectMapper()` in each function call is quite expensive


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836698#comment-15836698
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206112
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
+if (params.has("input")) {
+  env.readTextFile(params.get("input")).map { value: String =>
--- End diff --

Please add a comment describing what the map function does


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836701#comment-15836701
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97657374
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
  

[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836684#comment-15836684
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97205993
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
--- End diff --

Scaladocs are commented with 
```
/**
 *
 */
```
instead of 
```
/**
  *
  */
```


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836686#comment-15836686
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207663
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
--- End diff --

Scaladocs style


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836692#comment-15836692
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206987
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
--- End diff --

please add type


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836683#comment-15836683
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97205961
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
--- End diff --

The use of HTML is discouraged in Scaladoc. Instead use wiki markup 
(markdown) whenever possible.
See [Scaladoc guidelines](http://docs.scala-lang.org/style/scaladoc.html).


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836693#comment-15836693
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207859
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
+  // selecting English tweets and splitting to (word, 1)
+  .flatMap { value: String =>
+  val jsonParser = new ObjectMapper()
--- End diff --

Creating a new `ObjectMapper()` in each function call is quite expensive


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala 

[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836695#comment-15836695
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207844
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
+  // selecting English tweets and splitting to (word, 1)
+  .flatMap { value: String =>
--- End diff --

I think this complex function should be moved into a separate class.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This 

[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836690#comment-15836690
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97654855
  
--- Diff: 
flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/iteration/IterateExampleITCase.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.iteration;
+
+import org.apache.flink.streaming.scala.examples.iteration.IterateExample;
+import 
org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IterateExampleITCase extends StreamingProgramTestBase {
--- End diff --

ITCases extending `StreamingProgramTestBase` are very expensive because 
they internally start a Flink minicluster which takes a significant amount of 
time, usually much more than the actual test.
The class `StreamingMultipleProgramsTestBase` allows to reuse the 
minicluster across several tests.
I would suggest to port all existing example tests (Java and Scala) into a 
single ITCase which extends `StreamingMultipleProgramsTestBase`. This should 
reduce Flink's build time.



> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836699#comment-15836699
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207217
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
--- End diff --

I agree with @thvasilo. We should copy the code of the Java job. 

Otherwise, this example just demonstrates how to use `connect()` and 
`CoMapFunction`. 
For that we would not need custom sources and window aggregation.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836696#comment-15836696
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97650398
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.examples.java.wordcount.util.WordCountData
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+/**
+  * Implements a windowed version of the streaming "WordCount" program.
--- End diff --

Scaladocs


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836702#comment-15836702
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97651600
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.examples.java.wordcount.util.WordCountData
+import org.apache.flink.streaming.api.scala._
+
+/**
+  * Implements the "WordCount" program that computes a simple word 
occurrence
--- End diff --

Scaladocs


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836694#comment-15836694
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97658665
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * An example of grouped stream windowing in session windows with session 
timeout of 3 msec.
+  * A source fetches elements with key, timestamp, and count.
+  */
+object SessionWindowing {
+
+  def main(args: Array[String]): Unit = {
+
+val params = ParameterTool.fromArgs(args)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+env.getConfig.setGlobalJobParameters(params)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+
+val fileOutput = params.has("output")
+
+val input = List(
+  ("a", 1L, 1),
+  ("b", 1L, 1),
+  ("b", 3L, 1),
+  ("b", 5L, 1),
+  ("c", 6L, 1),
+  // We expect to detect the session "a" earlier than this point (the 
old
+  // functionality can only detect here when the next starts)
+  ("a", 10L, 1),
+  // We expect to detect session "b" and "c" at this point as well
+  ("c", 11L, 1)
+)
+
+val source = env.addSource(new SourceFunction[(String, Long, Int)]() {
+
+  override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
+input.foreach(value => {
+  ctx.collectWithTimestamp(value, value._2)
+  ctx.emitWatermark(new Watermark(value._2 - 1))
+  if (!fileOutput) {
--- End diff --

I'd remove this condition. Not sure how much value the printed values add 
to the example.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836691#comment-15836691
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97658391
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * An example of grouped stream windowing into sliding time windows.
+  * This example uses [[RichParallelSourceFunction]] to generate a list of 
key-value pair.
+  */
+object GroupedProcessingTimeWindowExample {
+
+  def main(args: Array[String]): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(4)
+
+val stream = env.addSource(new RichParallelSourceFunction[(Long, 
Long)]() {
--- End diff --

Move the source function to a separate class? It "hides" the important 
aspects of the example.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836685#comment-15836685
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206812
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
--- End diff --

please verify Scaladocs style.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836700#comment-15836700
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206373
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
+if (params.has("input")) {
+  env.readTextFile(params.get("input")).map { value: String =>
+val record = value.substring(1, value.length - 1)
+val splitted = record.split(",")
+(Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1)))
+  }
+} else {
+  println("Executing Iterate example with default input data set.")
+  println("Use --input to specify file input.")
+  env.addSource(new RandomFibonacciSource)
+}
+
+def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < 
Bound
+
+// create an iterative data stream from the input with 5 second timeout
+val numbers = inputStream
+  // Map the inputs so that the next Fibonacci numbers can be 
calculated
+  // while preserving the original input tuple
+  // A counter is attached to the tuple and incremented in every 
iteration step
+  .map(value => (value._1, value._2, value._1, value._2, 0))
+  .iterate(
+(iteration: DataStream[(Int, Int, Int, Int, Int)]) => {
+  // calculates the next Fibonacci number and increment the counter
+  val step = iteration.map(value =>
+(value._1, value._2, value._4, value._3 + value._4, value._5 + 
1))
+  // testing which tuple needs to be iterated again
+  val feedback = step.filter(value => withinBound(value._3, 
value._4))
+  // get the input pairs that have the greatest iteration counter
--- End diff --

Please check this comment. I do not see a sliding window or how the 
greatest iteration counter is identified.


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects 

[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836688#comment-15836688
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207774
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
--- End diff --

add DataStream type


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836697#comment-15836697
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206996
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
--- End diff --

please add type to DataStreams


> Sync Scala and Java Streaming Examples
> --
>
> Key: FLINK-3551
> URL: https://issues.apache.org/jira/browse/FLINK-3551
> Project: Flink
>  Issue Type: Sub-task
>  Components: Examples
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Lim Chee Hau
>
> The Scala Examples lack behind the Java Examples



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3551) Sync Scala and Java Streaming Examples

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836687#comment-15836687
 ] 

ASF GitHub Bot commented on FLINK-3551:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97657755
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
  

[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206812
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
--- End diff --

please verify Scaladocs style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207774
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
--- End diff --

add DataStream type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207663
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
--- End diff --

Scaladocs style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97654855
  
--- Diff: 
flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/exampleScalaPrograms/iteration/IterateExampleITCase.java
 ---
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.test.exampleScalaPrograms.iteration;
+
+import org.apache.flink.streaming.scala.examples.iteration.IterateExample;
+import 
org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class IterateExampleITCase extends StreamingProgramTestBase {
--- End diff --

ITCases extending `StreamingProgramTestBase` are very expensive because 
they internally start a Flink minicluster which takes a significant amount of 
time, usually much more than the actual test.
The class `StreamingMultipleProgramsTestBase` allows to reuse the 
minicluster across several tests.
I would suggest to port all existing example tests (Java and Scala) into a 
single ITCase which extends `StreamingMultipleProgramsTestBase`. This should 
reduce Flink's build time.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207844
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.twitter
+
+import java.util.StringTokenizer
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.connectors.twitter.TwitterSource
+import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
+import org.codehaus.jackson.JsonNode
+import org.codehaus.jackson.map.ObjectMapper
+
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Implements the "TwitterStream" program that computes a most used word
+  * occurrence over JSON objects in a streaming fashion.
+  * 
+  * The input is a Tweet stream from a TwitterSource.
+  * 
+  * 
+  * Usage: Usage: TwitterExample [--output ]
+  * [--twitter-source.consumerKey 
+  * --twitter-source.consumerSecret 
+  * --twitter-source.token 
+  * --twitter-source.tokenSecret ]
+  * 
+  *
+  * If no parameters are provided, the program is run with default data 
from
+  * {@link TwitterExampleData}.
+  * 
+  * 
+  * This example shows how to:
+  * 
+  * acquire external data,
+  * use in-line defined functions,
+  * handle flattened stream inputs.
+  * 
+  */
+object TwitterExample {
+
+  def main(args: Array[String]): Unit = {
+
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+println("Usage: TwitterExample [--output ] " +
+  "[--twitter-source.consumerKey  " +
+  "--twitter-source.consumerSecret  " +
+  "--twitter-source.token  " +
+  "--twitter-source.tokenSecret ]")
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+env.setParallelism(params.getInt("parallelism", 1))
+
+// get input data
+val streamSource =
+if (params.has(TwitterSource.CONSUMER_KEY) &&
+  params.has(TwitterSource.CONSUMER_SECRET) &&
+  params.has(TwitterSource.TOKEN) &&
+  params.has(TwitterSource.TOKEN_SECRET)
+) {
+  env.addSource(new TwitterSource(params.getProperties))
+} else {
+  print("Executing TwitterStream example with default props.")
+  print("Use --twitter-source.consumerKey  
--twitter-source.consumerSecret  " +
+"--twitter-source.token  " +
+"--twitter-source.tokenSecret  specify the 
authentication info."
+  )
+  // get default test text data
+  env.fromElements(TwitterExampleData.TEXTS: _*)
+}
+
+val tweets = streamSource
+  // selecting English tweets and splitting to (word, 1)
+  .flatMap { value: String =>
--- End diff --

I think this complex function should be moved into a separate class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97657374
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
+  (_: Array[Double]) => 1
+)
+
+// emit result
+if (params.has("output")) {
+  prediction.writeAsText(params.get("output"))
+} else {
+  println("Printing result to stdout. Use --output to specify output 
path.")
+  prediction.print()
+}
+
+// execute program
+env.execute("Streaming Incremental Learning")
+  }
+
+  // 
*
+  // USER FUNCTIONS
+  // 
*
+
+  /**
+* Feeds new data for newData. By default it is implemented as 
constantly
+* emitting the Integer 1 in a loop.
+*/
+  private class FiniteNewDataSource extends SourceFunction[Int] {
+var counter: Int = 0
+
+override def run(ctx: SourceContext[Int]) = {
+  Thread.sleep(15)
--- End diff --

can be simplified to 
```
Thread.sleep(15)
(0 until 50).foreach{ _ =>
  Thread.sleep(5)
  

[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97651600
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.wordcount
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.examples.java.wordcount.util.WordCountData
+import org.apache.flink.streaming.api.scala._
+
+/**
+  * Implements the "WordCount" program that computes a simple word 
occurrence
--- End diff --

Scaladocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206987
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
--- End diff --

please add type


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97207217
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
+val newData = env.addSource(new FiniteNewDataSource)
+
+val model = trainingData
+  .assignTimestampsAndWatermarks(new LinearTimestamp)
+  .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+  .apply(new PartialModelBuilder)
+
+// use partial model for newData
+val prediction = newData.connect(model).map(
+  (_: Int) => 0,
--- End diff --

I agree with @thvasilo. We should copy the code of the Java job. 

Otherwise, this example just demonstrates how to use `connect()` and 
`CoMapFunction`. 
For that we would not need custom sources and window aggregation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206373
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
+if (params.has("input")) {
+  env.readTextFile(params.get("input")).map { value: String =>
+val record = value.substring(1, value.length - 1)
+val splitted = record.split(",")
+(Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1)))
+  }
+} else {
+  println("Executing Iterate example with default input data set.")
+  println("Use --input to specify file input.")
+  env.addSource(new RandomFibonacciSource)
+}
+
+def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < 
Bound
+
+// create an iterative data stream from the input with 5 second timeout
+val numbers = inputStream
+  // Map the inputs so that the next Fibonacci numbers can be 
calculated
+  // while preserving the original input tuple
+  // A counter is attached to the tuple and incremented in every 
iteration step
+  .map(value => (value._1, value._2, value._1, value._2, 0))
+  .iterate(
+(iteration: DataStream[(Int, Int, Int, Int, Int)]) => {
+  // calculates the next Fibonacci number and increment the counter
+  val step = iteration.map(value =>
+(value._1, value._2, value._4, value._3 + value._4, value._5 + 
1))
+  // testing which tuple needs to be iterated again
+  val feedback = step.filter(value => withinBound(value._3, 
value._4))
+  // get the input pairs that have the greatest iteration counter
--- End diff --

Please check this comment. I do not see a sliding window or how the 
greatest iteration counter is identified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97658665
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
 ---
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+  * An example of grouped stream windowing in session windows with session 
timeout of 3 msec.
+  * A source fetches elements with key, timestamp, and count.
+  */
+object SessionWindowing {
+
+  def main(args: Array[String]): Unit = {
+
+val params = ParameterTool.fromArgs(args)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+env.getConfig.setGlobalJobParameters(params)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+
+val fileOutput = params.has("output")
+
+val input = List(
+  ("a", 1L, 1),
+  ("b", 1L, 1),
+  ("b", 3L, 1),
+  ("b", 5L, 1),
+  ("c", 6L, 1),
+  // We expect to detect the session "a" earlier than this point (the 
old
+  // functionality can only detect here when the next starts)
+  ("a", 10L, 1),
+  // We expect to detect session "b" and "c" at this point as well
+  ("c", 11L, 1)
+)
+
+val source = env.addSource(new SourceFunction[(String, Long, Int)]() {
+
+  override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
+input.foreach(value => {
+  ctx.collectWithTimestamp(value, value._2)
+  ctx.emitWatermark(new Watermark(value._2 - 1))
+  if (!fileOutput) {
--- End diff --

I'd remove this condition. Not sure how much value the printed values add 
to the example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206996
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/ml/IncrementalLearningSkeleton.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.ml
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.scala.function.AllWindowFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * Skeleton for incremental machine learning algorithm consisting of a
+  * pre-computed model, which gets updated for the new inputs and new 
input data
+  * for which the job provides predictions.
+  *
+  * 
+  * This may serve as a base of a number of algorithms, e.g. updating an
+  * incremental Alternating Least Squares model while also providing the
+  * predictions.
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * Connected streams
+  * CoFunctions
+  * Tuple data types
+  * 
+  */
+object IncrementalLearningSkeleton {
+
+  // 
*
+  // PROGRAM
+  // 
*
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// set up the execution environment
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+// build new model on every second of new data
+val trainingData = env.addSource(new FiniteTrainingDataSource)
--- End diff --

please add type to DataStreams


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2761: [FLINK-3551] [examples] Sync Scala Streaming Examp...

2017-01-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2761#discussion_r97206112
  
--- Diff: 
flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.iteration
+
+import java.util.Random
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
+
+/**
+  * Example illustrating iterations in Flink streaming.
+  *  The program sums up random numbers and counts additions
+  * it performs to reach a specific threshold in an iterative streaming 
fashion. 
+  *
+  * 
+  * This example shows how to use:
+  * 
+  * streaming iterations,
+  * buffer timeout to enhance latency,
+  * directed outputs.
+  * 
+  * 
+  */
+object IterateExample {
+
+  private val Bound = 100
+
+  def main(args: Array[String]): Unit = {
+// Checking input parameters
+val params = ParameterTool.fromArgs(args)
+
+// obtain execution environment and set setBufferTimeout to 1 to enable
+// continuous flushing of the output buffers (lowest latency)
+val env = 
StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
+
+// make parameters available in the web interface
+env.getConfig.setGlobalJobParameters(params)
+
+// create input stream of integer pairs
+val inputStream =
+if (params.has("input")) {
+  env.readTextFile(params.get("input")).map { value: String =>
--- End diff --

Please add a comment describing what the map function does


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #:

2017-01-24 Thread mindprince
Github user mindprince commented on the pull request:


https://github.com/apache/flink/commit/6342d6db1de5f38a921732e35abd83e6c5b9305a#commitcomment-20600038
  
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java:
In 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 on line 130:
@StephanEwen This would set allowedLateness for Processing time windows as 
well. Do we want that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2929: [FLINK-5247] Fix incorrect check in allowedLateness() met...

2017-01-24 Thread mindprince
Github user mindprince commented on the issue:

https://github.com/apache/flink/pull/2929
  
Merged in 
https://github.com/apache/flink/commit/87af84194911eb1e0c3b3a894bb3f04b628fbf11 
and 
https://github.com/apache/flink/commit/4697b97a0101cf04b43c4a6e4887adba10b4a69a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2929: [FLINK-5247] Fix incorrect check in allowedLatenes...

2017-01-24 Thread mindprince
Github user mindprince closed the pull request at:

https://github.com/apache/flink/pull/2929


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5247) Fix incorrect check in allowedLateness() method. Make it a no-op for non-event time windows.

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836655#comment-15836655
 ] 

ASF GitHub Bot commented on FLINK-5247:
---

Github user mindprince closed the pull request at:

https://github.com/apache/flink/pull/2929


> Fix incorrect check in allowedLateness() method. Make it a no-op for 
> non-event time windows.
> 
>
> Key: FLINK-5247
> URL: https://issues.apache.org/jira/browse/FLINK-5247
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0, 1.1.1, 1.1.2, 1.1.3
>Reporter: Rohit Agarwal
> Fix For: 1.2.0, 1.3.0
>
>
> Related to FLINK-3714 and FLINK-4239



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5247) Fix incorrect check in allowedLateness() method. Make it a no-op for non-event time windows.

2017-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15836654#comment-15836654
 ] 

ASF GitHub Bot commented on FLINK-5247:
---

Github user mindprince commented on the issue:

https://github.com/apache/flink/pull/2929
  
Merged in 
https://github.com/apache/flink/commit/87af84194911eb1e0c3b3a894bb3f04b628fbf11 
and 
https://github.com/apache/flink/commit/4697b97a0101cf04b43c4a6e4887adba10b4a69a


> Fix incorrect check in allowedLateness() method. Make it a no-op for 
> non-event time windows.
> 
>
> Key: FLINK-5247
> URL: https://issues.apache.org/jira/browse/FLINK-5247
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.1.0, 1.1.1, 1.1.2, 1.1.3
>Reporter: Rohit Agarwal
> Fix For: 1.2.0, 1.3.0
>
>
> Related to FLINK-3714 and FLINK-4239



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >