[jira] [Commented] (FLINK-2810) Warn user if bc not installed

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943704#comment-14943704
 ] 

ASF GitHub Bot commented on FLINK-2810:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1228#discussion_r41172866
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -73,6 +73,13 @@ if [[ $STARTSTOP == "start" ]]; then
 TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
 TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
 else
+# Bash only performs integer arithmetic so floating point 
computation is performed using bc
+BC_PATH=`command -v bc`
+if [[ $? -eq 1 || ! -f $BC_PATH ]]; then
--- End diff --

That should be sufficient. Thanks.


> Warn user if bc not installed
> -
>
> Key: FLINK-2810
> URL: https://issues.apache.org/jira/browse/FLINK-2810
> Project: Flink
>  Issue Type: Improvement
>  Components: Command-line client
>Affects Versions: 0.10
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 0.10
>
>
> taskmanager.sh will print the following message when starting the cluster if 
> bc is not installed and off-heap memory is enabled and configured as a ratio. 
> The script should first check that bc is installed and otherwise print a 
> specific message.
> {noformat}
> [ERROR] Configured TaskManager managed memory fraction is not a valid value. 
> Please set 'taskmanager.memory.fraction' in flink-conf.yaml
> {noformat}
> An example of a distribution where bc is not installed by default are the 
> Debian images for Google Compute Engine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...

2015-10-05 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145617179
  
I think alphabetically is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...

2015-10-05 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145620411
  
I was thinking more about the actual pixel length but achieving that is 
proving to be kind of hard.
Alphabetically should be okay I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2811) Add page with configuration overview

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943780#comment-14943780
 ] 

ASF GitHub Bot commented on FLINK-2811:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145620411
  
I was thinking more about the actual pixel length but achieving that is 
proving to be kind of hard.
Alphabetically should be okay I think.


> Add page with configuration overview
> 
>
> Key: FLINK-2811
> URL: https://issues.apache.org/jira/browse/FLINK-2811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> The old web interface contained a page to view the configuration of the 
> JobManager.
> This issue is about adding the page again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable

2015-10-05 Thread Chengxuan Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943852#comment-14943852
 ] 

Chengxuan Wang commented on FLINK-2066:
---

Hi, Maximilian, Do I need to make change based on 0.10? I think I made change 
based on 0.9.

> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>Priority: Blocker
>  Labels: starter
> Fix For: 0.10
>
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943860#comment-14943860
 ] 

ASF GitHub Bot commented on FLINK-2797:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-145635600
  
@uce , I have pushed a commit with my fix for disabling eager execution in 
detached mode. Please have a look. If it's okay, I can go ahead with adding 
docs for it.


> CLI: Missing option to submit jobs in detached mode
> ---
>
> Key: FLINK-2797
> URL: https://issues.apache.org/jira/browse/FLINK-2797
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> Jobs can only be submitted in detached mode using YARN but not on a 
> standalone installation. This has been requested by users who want to submit 
> a job, get the job id, and later query its status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2767) Add support Scala 2.11 to Scala shell

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943701#comment-14943701
 ] 

ASF GitHub Bot commented on FLINK-2767:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1197#issuecomment-145606437
  
It occured by jline library version mismatching between Scala 2.10 and 
2.11. Since Scala 2.11.5, we don't need jline dependency. Scala REPL uses its 
own jline library. So I upgraded Scala minor version to 2.11.7 and moved jline 
to Scala 2.10 only dependencies list.


> Add support Scala 2.11 to Scala shell
> -
>
> Key: FLINK-2767
> URL: https://issues.apache.org/jira/browse/FLINK-2767
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 0.10
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>
> Since FLINK-2200 is resolved, the Flink community provides JARs for Scala 
> 2.11. But currently, there is no Scala shell with Scala 2.11. If we add 
> support Scala 2.11 to Scala shell, the user with Scala 2.11 could use Flink 
> easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2741) Use single log statement in TestLogger

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943816#comment-14943816
 ] 

ASF GitHub Bot commented on FLINK-2741:
---

Github user rerngvit commented on the pull request:

https://github.com/apache/flink/pull/1221#issuecomment-145629889
  
Thanks all for suggestions.


> Use single log statement in TestLogger
> --
>
> Key: FLINK-2741
> URL: https://issues.apache.org/jira/browse/FLINK-2741
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: rerngvit yanggratoke
>Priority: Trivial
>
> {{TestLogger}} prints log statements before and after tests. Currently this 
> is done via multiple {{log.info}} statements. Sometimes this leads to 
> interleaved output with failure stack traces.
> I would like to change it to a single statements with new lines:
> {code}
> 17:30:31,887 ERROR A  - -
> 17:30:31,891 INFO  B  - Shutting down remote daemon.
> 17:30:31,895 ERROR A  - Test testJobManagerCleanUp(A) failed with:
> ...
> 17:30:31,909 ERROR A  - =
> {code}
> to
> {code}
> 17:30:31,891 INFO  B  - Shutting down remote daemon.
> 17:30:31,887 ERROR A  -
> -
> Test testJobManagerCleanUp(A) failed with:
> ...
> =
> {code}
> Any opinions? Does this improve readability?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2741] - Use single log statement in Tes...

2015-10-05 Thread rerngvit
Github user rerngvit commented on the pull request:

https://github.com/apache/flink/pull/1221#issuecomment-145629889
  
Thanks all for suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2811) Add page with configuration overview

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943848#comment-14943848
 ] 

ASF GitHub Bot commented on FLINK-2811:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145633502
  
Yes. You're right. I'd misunderstood Stephan's comment. The goal is to 
group similar keys together, and sorting alphabetically will definitely take 
care of that.
On a separate note, does it make sense to have this type of structure:
*Job Manager*
jobmanager.*

*Akka*
akka.

*Recovery*
ha.*

and so on. Of course, if no keys are available under a group, it won't be 
displayed at all. Or we can just leave it at sorted alphabetically. Your call.


> Add page with configuration overview
> 
>
> Key: FLINK-2811
> URL: https://issues.apache.org/jira/browse/FLINK-2811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> The old web interface contained a page to view the configuration of the 
> JobManager.
> This issue is about adding the page again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...

2015-10-05 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145633502
  
Yes. You're right. I'd misunderstood Stephan's comment. The goal is to 
group similar keys together, and sorting alphabetically will definitely take 
care of that.
On a separate note, does it make sense to have this type of structure:
*Job Manager*
jobmanager.*

*Akka*
akka.

*Recovery*
ha.*

and so on. Of course, if no keys are available under a group, it won't be 
displayed at all. Or we can just leave it at sorted alphabetically. Your call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...

2015-10-05 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-145635600
  
@uce , I have pushed a commit with my fix for disabling eager execution in 
detached mode. Please have a look. If it's okay, I can go ahead with adding 
docs for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2810] Warn user if bc not installed

2015-10-05 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1228#discussion_r41172866
  
--- Diff: flink-dist/src/main/flink-bin/bin/taskmanager.sh ---
@@ -73,6 +73,13 @@ if [[ $STARTSTOP == "start" ]]; then
 TM_OFFHEAP_SIZE=${FLINK_TM_MEM_MANAGED_SIZE}
 TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
 else
+# Bash only performs integer arithmetic so floating point 
computation is performed using bc
+BC_PATH=`command -v bc`
+if [[ $? -eq 1 || ! -f $BC_PATH ]]; then
--- End diff --

That should be sufficient. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2811) Add page with configuration overview

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943761#comment-14943761
 ] 

ASF GitHub Bot commented on FLINK-2811:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145617179
  
I think alphabetically is better.


> Add page with configuration overview
> 
>
> Key: FLINK-2811
> URL: https://issues.apache.org/jira/browse/FLINK-2811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> The old web interface contained a page to view the configuration of the 
> JobManager.
> This issue is about adding the page again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...

2015-10-05 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145630566
  
I see. But pixel length or number of characters does not make it easier to 
navigate when you want to look up a specific key, e.g. `jobmanager.X.Y`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2811) Add page with configuration overview

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943822#comment-14943822
 ] 

ASF GitHub Bot commented on FLINK-2811:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145630566
  
I see. But pixel length or number of characters does not make it easier to 
navigate when you want to look up a specific key, e.g. `jobmanager.X.Y`.


> Add page with configuration overview
> 
>
> Key: FLINK-2811
> URL: https://issues.apache.org/jira/browse/FLINK-2811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> The old web interface contained a page to view the configuration of the 
> JobManager.
> This issue is about adding the page again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2767] [scala shell] Add Scala 2.11 supp...

2015-10-05 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1197#issuecomment-145606437
  
It occured by jline library version mismatching between Scala 2.10 and 
2.11. Since Scala 2.11.5, we don't need jline dependency. Scala REPL uses its 
own jline library. So I upgraded Scala minor version to 2.11.7 and moved jline 
to Scala 2.10 only dependencies list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2815] [REFACTOR] Remove Pact from class...

2015-10-05 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1218#issuecomment-145740343
  
Updated based on review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2815) [REFACTOR] Remove Pact from class and file names since it is no longer valid reference

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944491#comment-14944491
 ] 

ASF GitHub Bot commented on FLINK-2815:
---

Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/1218#issuecomment-145740343
  
Updated based on review.


> [REFACTOR] Remove Pact from class and file names since it is no longer valid 
> reference
> --
>
> Key: FLINK-2815
> URL: https://issues.apache.org/jira/browse/FLINK-2815
> Project: Flink
>  Issue Type: Task
>Reporter: Henry Saputra
>Assignee: Henry Saputra
>Priority: Minor
>
> Remove Pact word from class and file names in Apache Flink.
> Pact was the name used in Stratosphere time to refer to concept of 
> distributed datasets (similar to Flink Dataset).
> It was used when Pact and Nephele still separate concept.
> As part of 0.10 cleanup effort, let's remove the Pact names to avoid 
> confusion.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2066][core] Add delay between execution...

2015-10-05 Thread WangCHX
Github user WangCHX commented on the pull request:

https://github.com/apache/flink/pull/1223#issuecomment-145740461
  
Thank you very much.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944495#comment-14944495
 ] 

ASF GitHub Bot commented on FLINK-2066:
---

Github user WangCHX commented on the pull request:

https://github.com/apache/flink/pull/1223#issuecomment-145740461
  
Thank you very much.


> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>Priority: Blocker
>  Labels: starter
> Fix For: 0.10
>
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944008#comment-14944008
 ] 

ASF GitHub Bot commented on FLINK-2283:
---

Github user mbalassi commented on the pull request:

https://github.com/apache/flink/pull/1155#issuecomment-145665200
  
@StephanEwen : Thanks for the heads-up.

Now I have addressed the comments and would like to merge this tomorrow if 
there are no objections.


> Make grouped reduce/fold/aggregations stateful using Partitioned state
> --
>
> Key: FLINK-2283
> URL: https://issues.apache.org/jira/browse/FLINK-2283
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Gyula Fora
>Assignee: Márton Balassi
>Priority: Minor
>
> Currently the inner state of the grouped aggregations are not persisted as an 
> operator state. 
> These operators should be reimplemented to use the newly introduced 
> partitioned state abstractions which will make them fault tolerant and 
> scalable for the future.
> A suggested implementation would be to use a stateful mapper to implement the 
> desired behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2820) Configuration not passed to JobGraphGenerator

2015-10-05 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-2820:
-

 Summary: Configuration not passed to JobGraphGenerator
 Key: FLINK-2820
 URL: https://issues.apache.org/jira/browse/FLINK-2820
 Project: Flink
  Issue Type: Bug
  Components: Command-line client
Affects Versions: master
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


This was previously reported as FLINK-2625 (commit 
8a84937215ea575fa94a00d11c2517902d252756). The Client class was concurrently 
refactored with FLINK-2097 (commit 71bf2f570861daae53b24bfcf1d06aedb85311b9).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2283] [streaming] Make grouped reduce/f...

2015-10-05 Thread mbalassi
Github user mbalassi commented on the pull request:

https://github.com/apache/flink/pull/1155#issuecomment-145665200
  
@StephanEwen : Thanks for the heads-up.

Now I have addressed the comments and would like to merge this tomorrow if 
there are no objections.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-2737) KeyedDataStream should not be a subclass of DataStream

2015-10-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-2737:
---

Assignee: Aljoscha Krettek

> KeyedDataStream should not be a subclass of DataStream
> --
>
> Key: FLINK-2737
> URL: https://issues.apache.org/jira/browse/FLINK-2737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2786] Remove Spargel from source code a...

2015-10-05 Thread vasia
GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/1229

[FLINK-2786] Remove Spargel from source code and update docs.

I also ported 2 Spargel tests that we hadn't copied over to Gelly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink flink-2786

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1229.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1229


commit 4391e8abf121f5d8ba6b8bdcd7d9a811c4d67806
Author: vasia 
Date:   2015-10-05T18:08:55Z

[FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; 
Remove Beta badge from Gelly




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2819) Add Windowed Join/CoGroup Operator Based on Tagged Union

2015-10-05 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-2819:
---

 Summary: Add Windowed Join/CoGroup Operator Based on Tagged Union
 Key: FLINK-2819
 URL: https://issues.apache.org/jira/browse/FLINK-2819
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 0.10


This will add a Join/CoGroup operation that reuses the new windowing code. The 
implementation should be similar to how a join can be implemented on MapReduce 
using tags for the two input side and then pulling them apart again in the 
reduce operation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942997#comment-14942997
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41115679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * Each counter creates a ZNode:
+ * 
+ * +O /flink/checkpoint-counter/job-id 1 [persistent]
+ * .
+ * .
+ * .
+ * +O /flink/checkpoint-counter/job-id N [persistent]
+ * 
+ *
+ * The checkpoints IDs are required to be ascending (per job). In order 
to guarantee this in case
+ * of job manager failures we use ZooKeeper to have a shared counter 
across job manager instances.
+ */
+public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** Path of the shared count */
+   private final String counterPath;
+
+   /** Curator recipe for shared counts */
+   private final SharedCount sharedCount;
+
+   /** Connection state listener to monitor the client connection */
+   private final SharedCountConnectionStateListener connStateListener =
+   new SharedCountConnectionStateListener();
+
+   /**
+* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+*
+* @param client  Curator ZooKeeper client
+* @param counterPath ZooKeeper path for the counter. It's sufficient 
to have a path per-job.
+* @throws Exception
+*/
+   public ZooKeeperCheckpointIDCounter(CuratorFramework client, String 
counterPath) throws Exception {
+   this.client = checkNotNull(client, "Curator client");
+   this.counterPath = checkNotNull(counterPath, "Counter path");
+   this.sharedCount = new SharedCount(client, counterPath, 1);
+   }
+
+   @Override
+   public void start() throws Exception {
+   sharedCount.start();
+   
client.getConnectionStateListenable().addListener(connStateListener);
+   }
+
+   @Override
+   public void stop() throws Exception {
+   sharedCount.close();
+   
client.getConnectionStateListenable().removeListener(connStateListener);
+
+   LOG.info("Removing {} from ZooKeeper", counterPath);
+   
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+   }
+
+   @Override
+   public long getAndIncrement() throws Exception {
+   while (true) {
+   ConnectionState connState = 
connStateListener.getLastState();
+
+   if (connState != null) {
+   throw new IllegalStateException("Connection 
state: " + connState);
+   }
+
+   VersionedValue current = 
sharedCount.getVersionedValue();
+
+   Integer 

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41115679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
 ---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointIDCounter} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * Each counter creates a ZNode:
+ * 
+ * +O /flink/checkpoint-counter/job-id 1 [persistent]
+ * .
+ * .
+ * .
+ * +O /flink/checkpoint-counter/job-id N [persistent]
+ * 
+ *
+ * The checkpoints IDs are required to be ascending (per job). In order 
to guarantee this in case
+ * of job manager failures we use ZooKeeper to have a shared counter 
across job manager instances.
+ */
+public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** Path of the shared count */
+   private final String counterPath;
+
+   /** Curator recipe for shared counts */
+   private final SharedCount sharedCount;
+
+   /** Connection state listener to monitor the client connection */
+   private final SharedCountConnectionStateListener connStateListener =
+   new SharedCountConnectionStateListener();
+
+   /**
+* Creates a {@link ZooKeeperCheckpointIDCounter} instance.
+*
+* @param client  Curator ZooKeeper client
+* @param counterPath ZooKeeper path for the counter. It's sufficient 
to have a path per-job.
+* @throws Exception
+*/
+   public ZooKeeperCheckpointIDCounter(CuratorFramework client, String 
counterPath) throws Exception {
+   this.client = checkNotNull(client, "Curator client");
+   this.counterPath = checkNotNull(counterPath, "Counter path");
+   this.sharedCount = new SharedCount(client, counterPath, 1);
+   }
+
+   @Override
+   public void start() throws Exception {
+   sharedCount.start();
+   
client.getConnectionStateListenable().addListener(connStateListener);
+   }
+
+   @Override
+   public void stop() throws Exception {
+   sharedCount.close();
+   
client.getConnectionStateListenable().removeListener(connStateListener);
+
+   LOG.info("Removing {} from ZooKeeper", counterPath);
+   
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+   }
+
+   @Override
+   public long getAndIncrement() throws Exception {
+   while (true) {
+   ConnectionState connState = 
connStateListener.getLastState();
+
+   if (connState != null) {
+   throw new IllegalStateException("Connection 
state: " + connState);
+   }
+
+   VersionedValue current = 
sharedCount.getVersionedValue();
+
+   Integer newCount = current.getValue() + 1;
+
+   if (sharedCount.trySetCount(current, newCount)) {
+   return current.getValue();
+   }
+   }
+   }
+
+   

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41115736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
 ---
@@ -19,29 +19,28 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
  * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
  * and that is considered completed.
  */
-public class SuccessfulCheckpoint {
-   
-   private static final Logger LOG = 
LoggerFactory.getLogger(SuccessfulCheckpoint.class);
-   
+public class SuccessfulCheckpoint implements Serializable {
+
+   private static final long serialVersionUID = -8360248179615702014L;
+
private final JobID job;

private final long checkpointID;

private final long timestamp;

-   private final List states;
-
+   private final ArrayList states;
--- End diff --

Because of Serializbility. SuccessfulCheckpoint has been changed to be 
Serializable. I figured this is OK, because it is an "internal" interface and 
the only usage creates and instance with ArrayList.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942998#comment-14942998
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41115736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java
 ---
@@ -19,29 +19,28 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
  * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
  * and that is considered completed.
  */
-public class SuccessfulCheckpoint {
-   
-   private static final Logger LOG = 
LoggerFactory.getLogger(SuccessfulCheckpoint.class);
-   
+public class SuccessfulCheckpoint implements Serializable {
+
+   private static final long serialVersionUID = -8360248179615702014L;
+
private final JobID job;

private final long checkpointID;

private final long timestamp;

-   private final List states;
-
+   private final ArrayList states;
--- End diff --

Because of Serializbility. SuccessfulCheckpoint has been changed to be 
Serializable. I figured this is OK, because it is an "internal" interface and 
the only usage creates and instance with ArrayList.


> Recover running jobs on JobManager failure
> --
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>+- job id i
> +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [~trohrm...@apache.org], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> 

[GitHub] flink pull request: [FLINK-2741] - Use single log statement in Tes...

2015-10-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1221#discussion_r41115826
  
--- Diff: flink-core/src/test/java/org/apache/flink/util/TestLogger.java ---
@@ -37,23 +37,26 @@
 
@Override
public void starting(Description description) {
-   
log.info("");
-   log.info("Test {} is running.", description);
-   
log.info("");
+   
log.info(""
--- End diff --

I would a new line here before the line as well. Then we have everything 
together with the same indentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2741) Use single log statement in TestLogger

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2741?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14942999#comment-14942999
 ] 

ASF GitHub Bot commented on FLINK-2741:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1221#discussion_r41115826
  
--- Diff: flink-core/src/test/java/org/apache/flink/util/TestLogger.java ---
@@ -37,23 +37,26 @@
 
@Override
public void starting(Description description) {
-   
log.info("");
-   log.info("Test {} is running.", description);
-   
log.info("");
+   
log.info(""
--- End diff --

I would a new line here before the line as well. Then we have everything 
together with the same indentation.


> Use single log statement in TestLogger
> --
>
> Key: FLINK-2741
> URL: https://issues.apache.org/jira/browse/FLINK-2741
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: rerngvit yanggratoke
>Priority: Trivial
>
> {{TestLogger}} prints log statements before and after tests. Currently this 
> is done via multiple {{log.info}} statements. Sometimes this leads to 
> interleaved output with failure stack traces.
> I would like to change it to a single statements with new lines:
> {code}
> 17:30:31,887 ERROR A  - -
> 17:30:31,891 INFO  B  - Shutting down remote daemon.
> 17:30:31,895 ERROR A  - Test testJobManagerCleanUp(A) failed with:
> ...
> 17:30:31,909 ERROR A  - =
> {code}
> to
> {code}
> 17:30:31,891 INFO  B  - Shutting down remote daemon.
> 17:30:31,887 ERROR A  -
> -
> Test testJobManagerCleanUp(A) failed with:
> ...
> =
> {code}
> Any opinions? Does this improve readability?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Flink 1745

2015-10-05 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-145449146
  
Thanks for this impressive PR. A minor comment: could you edit the title of 
the PR to include more details than the issue ID (we can't do it because the 
ASF is managing the github account). It's hard to track the PRs otherwise. I 
would just go with the JIRA title: `[FLINK-1745] Add exact k-nearest-neighbours 
algorithm to machine learning library`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943004#comment-14943004
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1220#issuecomment-145449146
  
Thanks for this impressive PR. A minor comment: could you edit the title of 
the PR to include more details than the issue ID (we can't do it because the 
ASF is managing the github account). It's hard to track the PRs otherwise. I 
would just go with the JIRA title: `[FLINK-1745] Add exact k-nearest-neighbours 
algorithm to machine learning library`


> Add exact k-nearest-neighbours algorithm to machine learning library
> 
>
> Key: FLINK-1745
> URL: https://issues.apache.org/jira/browse/FLINK-1745
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Till Rohrmann
>Assignee: Daniel Blazevski
>  Labels: ML, Starter
>
> Even though the k-nearest-neighbours (kNN) [1,2] algorithm is quite trivial 
> it is still used as a mean to classify data and to do regression. This issue 
> focuses on the implementation of an exact kNN (H-BNLJ, H-BRJ) algorithm as 
> proposed in [2].
> Could be a starter task.
> Resources:
> [1] [http://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm]
> [2] [https://www.cs.utah.edu/~lifeifei/papers/mrknnj.pdf]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145450115
  
I think TMs are only kept alive if their containers have been properly 
started. If the AM happens to die while the TM container are started up, I 
think they will be terminated as well. Another question is how did you kill the 
AM and what do you mean with "[...] restarting properly. But I think that's not 
the expected behavior"? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943008#comment-14943008
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145450115
  
I think TMs are only kept alive if their containers have been properly 
started. If the AM happens to die while the TM container are started up, I 
think they will be terminated as well. Another question is how did you kill the 
AM and what do you mean with "[...] restarting properly. But I think that's not 
the expected behavior"? 


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943014#comment-14943014
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user uce commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145451351
  
Looks like a lot of work to figure out the different version behaviours. 
Good job and thanks for the clear explanation. :)

I guess Robert meant with "not restarting properly" that the TMs were 
restarted as well.

How does the way you kill the AM affect recovery?

I will try this out later today.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2790] [yarn] [ha] Adds high availabilit...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145456411
  
I was just curious whether he killed them gracefully with a `PoisonPill` or 
via killing the JVM process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2790) Add high availability support for Yarn

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943029#comment-14943029
 ] 

ASF GitHub Bot commented on FLINK-2790:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1213#issuecomment-145456411
  
I was just curious whether he killed them gracefully with a `PoisonPill` or 
via killing the JVM process.


> Add high availability support for Yarn
> --
>
> Key: FLINK-2790
> URL: https://issues.apache.org/jira/browse/FLINK-2790
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Reporter: Till Rohrmann
> Fix For: 0.10
>
>
> Add master high availability support for Yarn. The idea is to let Yarn 
> restart a failed application master in a new container. For that, we set the 
> number of application retries to something greater than 1. 
> From version 2.4.0 onwards, it is possible to reuse already started 
> containers for the TaskManagers, thus, avoiding unnecessary restart delays.
> From version 2.6.0 onwards, it is possible to specify an interval in which 
> the number of application attempts have to be exceeded in order to fail the 
> job. This will prevent long running jobs from eventually depleting all 
> available application attempts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2811][web-dashboard]Add page with Confi...

2015-10-05 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145458067
  
On the user end, a url '/config' makes more sense IMO. On the server end, 
`config` is already mapped to something else, so I chose a `/jmconfig` there.
If you think `/jobManagerConfig` is better however, let me know. I will 
push a commit for that. 
On a related note, '/jobmanager/config' might be a better choice for url. 
Are there any job manager statistics which should be made available under a 
`/jobmanager` url? Running jobs and completed jobs are already there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2811) Add page with configuration overview

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943030#comment-14943030
 ] 

ASF GitHub Bot commented on FLINK-2811:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1219#issuecomment-145458067
  
On the user end, a url '/config' makes more sense IMO. On the server end, 
`config` is already mapped to something else, so I chose a `/jmconfig` there.
If you think `/jobManagerConfig` is better however, let me know. I will 
push a commit for that. 
On a related note, '/jobmanager/config' might be a better choice for url. 
Are there any job manager statistics which should be made available under a 
`/jobmanager` url? Running jobs and completed jobs are already there.


> Add page with configuration overview
> 
>
> Key: FLINK-2811
> URL: https://issues.apache.org/jira/browse/FLINK-2811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Robert Metzger
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> The old web interface contained a page to view the configuration of the 
> JobManager.
> This issue is about adding the page again.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2806] [scala-api] Add a TypeInformation...

2015-10-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1212#issuecomment-145462515
  
Looks good to me. @tillrohrmann can you have a quick look? You recently did 
some work on the TypeInformation and TypeSerializers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2642] [table] Scala Table API crashes w...

2015-10-05 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1209#issuecomment-145462685
  
@twalthr Please go ahead and merge it. :smile: Then we can close the issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2730) Add CPU/Network utilization graphs to new web dashboard

2015-10-05 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943067#comment-14943067
 ] 

Sachin Goel commented on FLINK-2730:


Of course the charts are updated in real time. Here's an example: 
http://jsfiddle.net/sachingoel0101/4je935Lu/

> Add CPU/Network utilization graphs to new web dashboard
> ---
>
> Key: FLINK-2730
> URL: https://issues.apache.org/jira/browse/FLINK-2730
> Project: Flink
>  Issue Type: Sub-task
>  Components: Webfrontend
>Affects Versions: 0.10
>Reporter: Stephan Ewen
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> The charts rendered in the previous dashboard should be added to the new web 
> dashboard.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2550) Rework DataStream API

2015-10-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2550?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-2550:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-2674

> Rework DataStream API
> -
>
> Key: FLINK-2550
> URL: https://issues.apache.org/jira/browse/FLINK-2550
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 0.10
>
>
> After discussions on the mailing list we arrived at a consensus to rework the 
> streaming API to make it more fool-proof and easier to use. The resulting 
> design document is available here: 
> https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+on+Streams



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2642) Scala Table API crashes when executing word count example

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943063#comment-14943063
 ] 

ASF GitHub Bot commented on FLINK-2642:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1209#issuecomment-145462685
  
@twalthr Please go ahead and merge it. :smile: Then we can close the issue.


> Scala Table API crashes when executing word count example
> -
>
> Key: FLINK-2642
> URL: https://issues.apache.org/jira/browse/FLINK-2642
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
> Environment: current master (0.10)
>Reporter: Jonas Traub
>Assignee: Timo Walther
>
> I tried to run the examples provided in the documentation of Flink's Table 
> API. Unfortunately, the Scala word count example provided in the 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/libs/table.html]
>  doesn't work and does not give a meaningful exception.
> (Other examples work fine)
> Here my code:
> {code:java}
> package org.apache.flink.examples.scala
> import org.apache.flink.api.scala._
> import org.apache.flink.api.scala.table._
> object WordCount {
>   def main(args: Array[String]): Unit = {
> // set up execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> case class WC(word: String, count: Int)
> val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 
> 1))
> val expr = input.toTable
> val result = expr.groupBy('word).select('word, 'count.sum as 
> 'count).toDataSet[WC]
> result.print()
>   }
> }
> {code}
> Here the thrown exception:
> {code}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:414)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>   at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:104)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The user defined 'open(Configuration)' method 
> in class org.apache.flink.api.table.runtime.ExpressionSelectFunction caused 
> an exception: null
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.openUserCode(RegularPactTask.java:1368)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.openTask(ChainedMapDriver.java:47)
>   at 
> org.apache.flink.runtime.operators.RegularPactTask.openChainedTasks(RegularPactTask.java:1408)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:142)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.api.table.codegen.IndentStringContext$$anonfun$j$2.apply(Indenter.scala:30)
>   at 
> org.apache.flink.api.table.codegen.IndentStringContext$$anonfun$j$2.apply(Indenter.scala:23)
>   at 
> 

[jira] [Commented] (FLINK-2806) No TypeInfo for Scala's Nothing type

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943062#comment-14943062
 ] 

ASF GitHub Bot commented on FLINK-2806:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1212#issuecomment-145462515
  
Looks good to me. @tillrohrmann can you have a quick look? You recently did 
some work on the TypeInformation and TypeSerializers.


> No TypeInfo for Scala's Nothing type
> 
>
> Key: FLINK-2806
> URL: https://issues.apache.org/jira/browse/FLINK-2806
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>
> When writing some generic code, I encountered a situation where I needed a 
> TypeInformation[Nothing]. Two problems prevent me from getting it:
> 1. TypeInformationGen.mkTypeInfo doesn't return a real 
> TypeInformation[Nothing]. (It actually returns a casted null in that case.)
> 2. The line
> implicit def createTypeInformation[T]: TypeInformation[T] = macro 
> TypeUtils.createTypeInfo[T]
> does not fire in some situations when it should, when T = Nothing. (I guess 
> this is a compiler bug.)
> I will open a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stream API Refactoring

2015-10-05 Thread ktzoumas
Github user ktzoumas commented on a diff in the pull request:

https://github.com/apache/flink/pull/1215#discussion_r41125302
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 ---
@@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream dataStream, 
long maxWaitTime) {
/**
 * Changes the feedback type of the iteration and allows the user to 
apply
 * co-transformations on the input and feedback stream, as in a
-* {@link ConnectedDataStream}.
+* {@link ConnectedStreams}.
 *
 * 
 * For type safety the user needs to define the feedback type
 * 
 * @param feedbackTypeString
 *String describing the type information of the feedback 
stream.
-* @return A {@link ConnectedIterativeDataStream}.
+* @return A {@link ConnectedIterativeDataStreams}.
 */
-   public  ConnectedIterativeDataStream withFeedbackType(String 
feedbackTypeString) {
+   public  ConnectedIterativeDataStreams withFeedbackType(String 
feedbackTypeString) {
return withFeedbackType(TypeInfoParser. 
parse(feedbackTypeString));
}
 
/**
 * Changes the feedback type of the iteration and allows the user to 
apply
 * co-transformations on the input and feedback stream, as in a
-* {@link ConnectedDataStream}.
+* {@link ConnectedStreams}.
 *
 * 
 * For type safety the user needs to define the feedback type
 * 
 * @param feedbackTypeClass
 *Class of the elements in the feedback stream.
-* @return A {@link ConnectedIterativeDataStream}.
+* @return A {@link ConnectedIterativeDataStreams}.
 */
-   public  ConnectedIterativeDataStream withFeedbackType(Class 
feedbackTypeClass) {
+   public  ConnectedIterativeDataStreams 
withFeedbackType(Class feedbackTypeClass) {
return 
withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
--- End diff --

Why ConnectedIterativeDataStreams and not ConnectedIterativeStreams 
(following the naming of the other classes)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2806] [scala-api] Add a TypeInformation...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1212#discussion_r41125339
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+
+class ScalaNothingTypeInfo extends TypeInformation[Nothing] {
+
+  override def isBasicType: Boolean = false
+  override def isTupleType: Boolean = false
+  override def getArity: Int = 0
+  override def getTotalFields: Int = 0
+  override def getTypeClass: Class[Nothing] = classOf[Nothing]
+  override def isKeyType: Boolean = false
+
+  override def createSerializer(config: ExecutionConfig): 
TypeSerializer[Nothing] =
+(new NothingSerializer).asInstanceOf[TypeSerializer[Nothing]]
+
+  override def hashCode(): Int = 42
--- End diff --

Could we use `classOf[ScalaNothingTypeInfo].hashCode` here? Otherwise, all 
objects we create end up in the bucket with hash code `42`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2806] [scala-api] Add a TypeInformation...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1212#issuecomment-145476115
  
LGTM. Only one minor comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2806) No TypeInfo for Scala's Nothing type

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943126#comment-14943126
 ] 

ASF GitHub Bot commented on FLINK-2806:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1212#discussion_r41125339
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+
+class ScalaNothingTypeInfo extends TypeInformation[Nothing] {
+
+  override def isBasicType: Boolean = false
+  override def isTupleType: Boolean = false
+  override def getArity: Int = 0
+  override def getTotalFields: Int = 0
+  override def getTypeClass: Class[Nothing] = classOf[Nothing]
+  override def isKeyType: Boolean = false
+
+  override def createSerializer(config: ExecutionConfig): 
TypeSerializer[Nothing] =
+(new NothingSerializer).asInstanceOf[TypeSerializer[Nothing]]
+
+  override def hashCode(): Int = 42
--- End diff --

Could we use `classOf[ScalaNothingTypeInfo].hashCode` here? Otherwise, all 
objects we create end up in the bucket with hash code `42`.


> No TypeInfo for Scala's Nothing type
> 
>
> Key: FLINK-2806
> URL: https://issues.apache.org/jira/browse/FLINK-2806
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>
> When writing some generic code, I encountered a situation where I needed a 
> TypeInformation[Nothing]. Two problems prevent me from getting it:
> 1. TypeInformationGen.mkTypeInfo doesn't return a real 
> TypeInformation[Nothing]. (It actually returns a casted null in that case.)
> 2. The line
> implicit def createTypeInformation[T]: TypeInformation[T] = macro 
> TypeUtils.createTypeInfo[T]
> does not fire in some situations when it should, when T = Nothing. (I guess 
> this is a compiler bug.)
> I will open a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stream API Refactoring

2015-10-05 Thread ktzoumas
Github user ktzoumas commented on a diff in the pull request:

https://github.com/apache/flink/pull/1215#discussion_r41125467
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -87,7 +207,7 @@ public GroupedDataStream(DataStream dataStream, 
KeySelector keySelect
TypeInformation outType = 
TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), true);
 
-   return transform("Grouped Fold", outType, new 
StreamGroupedFold(clean(folder),
+   return transform("Grouped Fold", outType, new 
StreamGroupedFold<>(clean(folder),
keySelector, initialValue));
--- End diff --

"Grouped Fold" or simply "Fold"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stream API Refactoring

2015-10-05 Thread ktzoumas
Github user ktzoumas commented on a diff in the pull request:

https://github.com/apache/flink/pull/1215#discussion_r41125422
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 ---
@@ -24,49 +24,169 @@
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
-import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import 
org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import 
org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.time.EventTime;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 
 /**
- * A GroupedDataStream represents a {@link DataStream} which has been
- * partitioned by the given {@link KeySelector}. Operators like {@link 
#reduce},
- * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
- * get additional functionality by the grouping.
+ * A {@code KeyedStream} represents a {@link DataStream} on which operator 
state is
+ * partitioned by key using a provided {@link KeySelector}. Typical 
operations supported by a
+ * {@code DataStream} are also possible on a {@code KeyedStream}, with the 
exception of
+ * partitioning methods such as shuffle, forward and keyBy.
  *
- * @param  The type of the elements in the Grouped Stream.
+ * 
+ * Reduce-style operations, such as {@link #reduce}, {@link #sum} and 
{@link #fold} work on elements
+ * that have the same key.
+ *
+ * @param  The type of the elements in the Keyed Stream.
  * @param  The type of the key in the Keyed Stream.
  */
-public class GroupedDataStream extends KeyedDataStream {
+public class KeyedStream extends DataStream {
+   
+   protected final KeySelector keySelector;
+
+   /**
+* Creates a new {@link KeyedStream} using the given {@link KeySelector}
+* to partition operator state by key.
+* 
+* @param dataStream
+*Base stream of data
+* @param keySelector
+*Function for determining state partitions
+*/
+   public KeyedStream(DataStream dataStream, KeySelector 
keySelector) {
+   super(dataStream.getExecutionEnvironment(), new 
PartitionTransformation<>(dataStream.getTransformation(), new 
HashPartitioner<>(keySelector)));
+   this.keySelector = keySelector;
+   }
+
+   
+   public KeySelector getKeySelector() {
+   return this.keySelector;
+   }
+
+   
+   @Override
+   protected DataStream setConnectionType(StreamPartitioner 
partitioner) {
+   throw new UnsupportedOperationException("Cannot override 
partitioning for KeyedStream.");
+   }
+
+   
+   @Override
+   public  SingleOutputStreamOperator transform(String 
operatorName,
+   TypeInformation outTypeInfo, 
OneInputStreamOperator operator) {
+
+   SingleOutputStreamOperator returnStream = 
super.transform(operatorName, outTypeInfo,operator);
+
+   ((OneInputTransformation) 
returnStream.getTransformation()).setStateKeySelector(keySelector);
+   return returnStream;
+   }
+
+   
+   
+   @Override
+   public DataStreamSink addSink(SinkFunction sinkFunction) {
+   DataStreamSink result = 

[jira] [Commented] (FLINK-2806) No TypeInfo for Scala's Nothing type

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943127#comment-14943127
 ] 

ASF GitHub Bot commented on FLINK-2806:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/1212#issuecomment-145476115
  
LGTM. Only one minor comment.


> No TypeInfo for Scala's Nothing type
> 
>
> Key: FLINK-2806
> URL: https://issues.apache.org/jira/browse/FLINK-2806
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>
> When writing some generic code, I encountered a situation where I needed a 
> TypeInformation[Nothing]. Two problems prevent me from getting it:
> 1. TypeInformationGen.mkTypeInfo doesn't return a real 
> TypeInformation[Nothing]. (It actually returns a casted null in that case.)
> 2. The line
> implicit def createTypeInformation[T]: TypeInformation[T] = macro 
> TypeUtils.createTypeInfo[T]
> does not fire in some situations when it should, when T = Nothing. (I guess 
> this is a compiler bug.)
> I will open a PR shortly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stream API Refactoring

2015-10-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/1215#discussion_r41125469
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
 ---
@@ -81,56 +81,56 @@ protected IterativeDataStream(DataStream dataStream, 
long maxWaitTime) {
/**
 * Changes the feedback type of the iteration and allows the user to 
apply
 * co-transformations on the input and feedback stream, as in a
-* {@link ConnectedDataStream}.
+* {@link ConnectedStreams}.
 *
 * 
 * For type safety the user needs to define the feedback type
 * 
 * @param feedbackTypeString
 *String describing the type information of the feedback 
stream.
-* @return A {@link ConnectedIterativeDataStream}.
+* @return A {@link ConnectedIterativeDataStreams}.
 */
-   public  ConnectedIterativeDataStream withFeedbackType(String 
feedbackTypeString) {
+   public  ConnectedIterativeDataStreams withFeedbackType(String 
feedbackTypeString) {
return withFeedbackType(TypeInfoParser. 
parse(feedbackTypeString));
}
 
/**
 * Changes the feedback type of the iteration and allows the user to 
apply
 * co-transformations on the input and feedback stream, as in a
-* {@link ConnectedDataStream}.
+* {@link ConnectedStreams}.
 *
 * 
 * For type safety the user needs to define the feedback type
 * 
 * @param feedbackTypeClass
 *Class of the elements in the feedback stream.
-* @return A {@link ConnectedIterativeDataStream}.
+* @return A {@link ConnectedIterativeDataStreams}.
 */
-   public  ConnectedIterativeDataStream withFeedbackType(Class 
feedbackTypeClass) {
+   public  ConnectedIterativeDataStreams 
withFeedbackType(Class feedbackTypeClass) {
return 
withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
--- End diff --

You're right, I'll also rename those.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943132#comment-14943132
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41126140
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -871,7 +898,13 @@ public Object call() throws Exception {
}
}, 
executionContext);
} else {
-   restart();
+   future(new 
Callable() {
--- End diff --

Good catch :-)


> Recover running jobs on JobManager failure
> --
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>+- job id i
> +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [~trohrm...@apache.org], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and 
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting 
> interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Stream API Refactoring

2015-10-05 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1215#issuecomment-145482078
  
Impressive work, looks good!

Merging this means that we need to commit to reworking the `join()` 
implementation very soon.

If that is the case, +1 from my side


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41126140
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -871,7 +898,13 @@ public Object call() throws Exception {
}
}, 
executionContext);
} else {
-   restart();
+   future(new 
Callable() {
--- End diff --

Good catch :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stream API Refactoring

2015-10-05 Thread ktzoumas
Github user ktzoumas commented on the pull request:

https://github.com/apache/flink/pull/1215#issuecomment-145482079
  
+1 to merge. This will make the testing of the new API much easier


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41126346
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ *
+ * All operations are NoOps, because {@link JobGraph} instances cannot 
be recovered in this
+ * recovery mode.
+ */
+public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs {
+
+   @Override
+   public void start(SubmittedJobGraphListener jobGraphListener) throws 
Exception {
+   // Nothing to do
+   }
+
+   @Override
+   public void stop() {
+   // Nothing to do
+   }
+
+   @Override
+   public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+   // Nothing to do
+   }
+
+   @Override
+   public void removeJobGraph(JobID jobId) throws Exception {
+   // Nothing to do
+   }
+
+   @Override
+   public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+   throw new IllegalStateException("StandaloneSubmittedJobGraphs 
cannot recover job graphs. " +
+   "How did you end up here?");
+   }
+
+   @Override
+   public List recoverJobGraphs() throws Exception {
+   return Collections.emptyList();
--- End diff --

In `recoverJobGraph`, an exception is thrown, whereas here an empty list is 
returned. Maybe we should do it in a consistent manner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943135#comment-14943135
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41126346
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphs.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#STANDALONE}.
+ *
+ * All operations are NoOps, because {@link JobGraph} instances cannot 
be recovered in this
+ * recovery mode.
+ */
+public class StandaloneSubmittedJobGraphs implements SubmittedJobGraphs {
+
+   @Override
+   public void start(SubmittedJobGraphListener jobGraphListener) throws 
Exception {
+   // Nothing to do
+   }
+
+   @Override
+   public void stop() {
+   // Nothing to do
+   }
+
+   @Override
+   public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception {
+   // Nothing to do
+   }
+
+   @Override
+   public void removeJobGraph(JobID jobId) throws Exception {
+   // Nothing to do
+   }
+
+   @Override
+   public SubmittedJobGraph recoverJobGraph(JobID jobId) throws Exception {
+   throw new IllegalStateException("StandaloneSubmittedJobGraphs 
cannot recover job graphs. " +
+   "How did you end up here?");
+   }
+
+   @Override
+   public List recoverJobGraphs() throws Exception {
+   return Collections.emptyList();
--- End diff --

In `recoverJobGraph`, an exception is thrown, whereas here an empty list is 
returned. Maybe we should do it in a consistent manner.


> Recover running jobs on JobManager failure
> --
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
> Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>+- job id i
> +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s 

[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943137#comment-14943137
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41126510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java
 ---
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import 
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * Each job graph creates ZNode:
+ * 
+ * +O /flink/jobgraphs/job-id 1 [persistent]
+ * .
+ * .
+ * .
+ * +O /flink/jobgraphs/job-id N [persistent]
+ * 
+ *
+ * The root path is watched to detect concurrent modifications in 
corner situations where
+ * multiple instances operate concurrently. The job manager acts as a 
{@link SubmittedJobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class);
+
+   /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
+   private final Object cacheLock = new Object();
+
+   /** Client (not a namespace facade) */
+   private final CuratorFramework client;
+
+   /** The set of IDs of all added job graphs. */
+   private final Set addedJobGraphs = new HashSet<>();
+
+   /** Completed checkpoints in ZooKeeper */
+   private final ZooKeeperStateHandleStore 
jobGraphsInZooKeeper;
+
+   /**
+* Cache to monitor all children. This is used to detect races with 
other instances working
+* on the same state.
+*/
+   private final PathChildrenCache pathCache;
+
+   /** The external listener to be notified on races. */
+   private SubmittedJobGraphListener jobGraphListener;
+
+   /** Flag indicating whether this instance is running. */
+   private boolean isRunning;
+
+   public ZooKeeperSubmittedJobGraphs(
+   CuratorFramework client,
+   String currentJobsPath,
+   StateHandleProvider 
stateHandleProvider) throws Exception {
+
+   checkNotNull(client, "Curator client");
--- End diff --

`this.client = checkNotNull()`.


> Recover running jobs on JobManager failure
> --
>
> Key: FLINK-2354
> URL: https://issues.apache.org/jira/browse/FLINK-2354
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager
>Affects Versions: master
>Reporter: Ufuk Celebi
>Assignee: Ufuk 

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41126510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphs.java
 ---
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import 
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandle;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@link SubmittedJobGraph} instances for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
+ *
+ * Each job graph creates ZNode:
+ * 
+ * +O /flink/jobgraphs/job-id 1 [persistent]
+ * .
+ * .
+ * .
+ * +O /flink/jobgraphs/job-id N [persistent]
+ * 
+ *
+ * The root path is watched to detect concurrent modifications in 
corner situations where
+ * multiple instances operate concurrently. The job manager acts as a 
{@link SubmittedJobGraphListener}
+ * to react to such situations.
+ */
+public class ZooKeeperSubmittedJobGraphs implements SubmittedJobGraphs {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphs.class);
+
+   /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */
+   private final Object cacheLock = new Object();
+
+   /** Client (not a namespace facade) */
+   private final CuratorFramework client;
+
+   /** The set of IDs of all added job graphs. */
+   private final Set addedJobGraphs = new HashSet<>();
+
+   /** Completed checkpoints in ZooKeeper */
+   private final ZooKeeperStateHandleStore 
jobGraphsInZooKeeper;
+
+   /**
+* Cache to monitor all children. This is used to detect races with 
other instances working
+* on the same state.
+*/
+   private final PathChildrenCache pathCache;
+
+   /** The external listener to be notified on races. */
+   private SubmittedJobGraphListener jobGraphListener;
+
+   /** Flag indicating whether this instance is running. */
+   private boolean isRunning;
+
+   public ZooKeeperSubmittedJobGraphs(
+   CuratorFramework client,
+   String currentJobsPath,
+   StateHandleProvider 
stateHandleProvider) throws Exception {
+
+   checkNotNull(client, "Curator client");
--- End diff --

`this.client = checkNotNull()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2809] [scala-api] Added UnitTypeInfo an...

2015-10-05 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1217#discussion_r41126581
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class UnitSerializer extends TypeSerializerSingleton[Unit] {
+
+  def isImmutableType: Boolean = true
+
+  def createInstance: Unit = ()
+
+  def copy(from: Unit): Unit = ()
+
+  def copy(from: Unit, reuse: Unit): Unit = ()
+
+  def getLength: Int = 1
+
+  def serialize(record: Unit, target: DataOutputView) {
+target.write(0)
+  }
+
+  def deserialize(source: DataInputView): Unit = {
+source.readByte
--- End diff --

The `readByte()` and `write()` method should have parenthesis (both for 
semantics and to avoid confusion that the Unit parenthesis belong to the method 
call...

I think we need an entry in the coding guidelines for Scala there (or 
better, a style check), that because it seems that dropping the parenthesis 
when Scala calls Java functions happens a bit too eagerly (I think most IDEs 
make not very good suggestions there as well).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2809) DataSet[Unit] doesn't work

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943140#comment-14943140
 ] 

ASF GitHub Bot commented on FLINK-2809:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/1217#discussion_r41126581
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitSerializer.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.typeutils
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class UnitSerializer extends TypeSerializerSingleton[Unit] {
+
+  def isImmutableType: Boolean = true
+
+  def createInstance: Unit = ()
+
+  def copy(from: Unit): Unit = ()
+
+  def copy(from: Unit, reuse: Unit): Unit = ()
+
+  def getLength: Int = 1
+
+  def serialize(record: Unit, target: DataOutputView) {
+target.write(0)
+  }
+
+  def deserialize(source: DataInputView): Unit = {
+source.readByte
--- End diff --

The `readByte()` and `write()` method should have parenthesis (both for 
semantics and to avoid confusion that the Unit parenthesis belong to the method 
call...

I think we need an entry in the coding guidelines for Scala there (or 
better, a style check), that because it seems that dropping the parenthesis 
when Scala calls Java functions happens a bit too eagerly (I think most IDEs 
make not very good suggestions there as well).


> DataSet[Unit] doesn't work
> --
>
> Key: FLINK-2809
> URL: https://issues.apache.org/jira/browse/FLINK-2809
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Minor
>
> The following code creates a DataSet\[Unit\]:
> val env = ExecutionEnvironment.createLocalEnvironment()
> val a = env.fromElements(1,2,3)
> val b = a.map (_ => ())
> b.writeAsText("/tmp/xxx")
> env.execute()
> This doesn't work, because a VoidSerializer is created, which can't cope with 
> a BoxedUnit. See exception below.
> I'm now thinking about creating a UnitSerializer class.
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
>   at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at 
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.ClassCastException: scala.runtime.BoxedUnit cannot be 
> cast to java.lang.Void
>   at 
> 

[GitHub] flink pull request: [FLINK-2797][cli] Add support for running jobs...

2015-10-05 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-145483163
  
Fair enough, we can do this but the error reporting must be pretty good. 
Otherwise it might confuse users because not everyone gets that eager execution 
spawns multiple jobs. The documentation should also explain why detached jobs 
cannot contain eager executions. I'm not sure whether we can still include this 
in 0.10 but please go ahead if you want to improve the pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2797) CLI: Missing option to submit jobs in detached mode

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943141#comment-14943141
 ] 

ASF GitHub Bot commented on FLINK-2797:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/1214#issuecomment-145483163
  
Fair enough, we can do this but the error reporting must be pretty good. 
Otherwise it might confuse users because not everyone gets that eager execution 
spawns multiple jobs. The documentation should also explain why detached jobs 
cannot contain eager executions. I'm not sure whether we can still include this 
in 0.10 but please go ahead if you want to improve the pull request.


> CLI: Missing option to submit jobs in detached mode
> ---
>
> Key: FLINK-2797
> URL: https://issues.apache.org/jira/browse/FLINK-2797
> Project: Flink
>  Issue Type: Bug
>  Components: Command-line client
>Affects Versions: 0.9, 0.10
>Reporter: Maximilian Michels
>Assignee: Sachin Goel
> Fix For: 0.10
>
>
> Jobs can only be submitted in detached mode using YARN but not on a 
> standalone installation. This has been requested by users who want to submit 
> a job, get the job id, and later query its status.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2821) Change Akka configuration to allow accessing actors from different URLs

2015-10-05 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-2821:
-

 Summary: Change Akka configuration to allow accessing actors from 
different URLs
 Key: FLINK-2821
 URL: https://issues.apache.org/jira/browse/FLINK-2821
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Reporter: Robert Metzger


Akka expects the actor's URL to be exactly matching.

As pointed out here, cases where users were complaining about this: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-trying-to-access-JM-through-proxy-td3018.html

  - Proxy routing (as described here, send to the proxy URL, receiver 
recognizes only original URL)
  - Using hostname / IP interchangeably does not work (we solved this by always 
putting IP addresses into URLs, never hostnames)
  - Binding to multiple interfaces (any local 0.0.0.0) does not work. Still no 
solution to that (but seems not too much of a restriction)


I am aware that this is not possible due to Akka, so it is actually not a Flink 
bug. But I think we should track the resolution of the issue here anyways 
because its affecting our user's satisfaction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944324#comment-14944324
 ] 

ASF GitHub Bot commented on FLINK-2066:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1223#issuecomment-145708133
  
Thanks a lot for your contribution to Flink!
Sorry for not giving you feedback earlier. Many committers are currently 
busy preparing the next Flink release.

Can you also update the documentation about the Execution Conf ? There is 
an entire section on the available parameters:

https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#execution-configuration
You can find the markdown files for the documentation in the `docs/` 
directory of the Flink source.

I quickly looked over the code, and I think its in a good shape!


> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>Priority: Blocker
>  Labels: starter
> Fix For: 0.10
>
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2066][core] Add delay between execution...

2015-10-05 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1223#issuecomment-145708133
  
Thanks a lot for your contribution to Flink!
Sorry for not giving you feedback earlier. Many committers are currently 
busy preparing the next Flink release.

Can you also update the documentation about the Execution Conf ? There is 
an entire section on the available parameters:

https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#execution-configuration
You can find the markdown files for the documentation in the `docs/` 
directory of the Flink source.

I quickly looked over the code, and I think its in a good shape!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2767) Add support Scala 2.11 to Scala shell

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944329#comment-14944329
 ] 

ASF GitHub Bot commented on FLINK-2767:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1197#issuecomment-145708723
  
Thank you for the review @nikste!


> Add support Scala 2.11 to Scala shell
> -
>
> Key: FLINK-2767
> URL: https://issues.apache.org/jira/browse/FLINK-2767
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 0.10
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>
> Since FLINK-2200 is resolved, the Flink community provides JARs for Scala 
> 2.11. But currently, there is no Scala shell with Scala 2.11. If we add 
> support Scala 2.11 to Scala shell, the user with Scala 2.11 could use Flink 
> easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2767] [scala shell] Add Scala 2.11 supp...

2015-10-05 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1197#issuecomment-145708723
  
Thank you for the review @nikste!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2786) Remove Spargel from source code and update documentation in favor of Gelly

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943876#comment-14943876
 ] 

ASF GitHub Bot commented on FLINK-2786:
---

GitHub user vasia opened a pull request:

https://github.com/apache/flink/pull/1229

[FLINK-2786] Remove Spargel from source code and update docs.

I also ported 2 Spargel tests that we hadn't copied over to Gelly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vasia/flink flink-2786

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1229.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1229


commit 4391e8abf121f5d8ba6b8bdcd7d9a811c4d67806
Author: vasia 
Date:   2015-10-05T18:08:55Z

[FLINK-2786] Remove Spargel code and docs; Port Spargel tests to Gelly; 
Remove Beta badge from Gelly




> Remove Spargel from source code and update documentation in favor of Gelly
> --
>
> Key: FLINK-2786
> URL: https://issues.apache.org/jira/browse/FLINK-2786
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Spargel
>Reporter: Henry Saputra
>Assignee: Vasia Kalavri
>
> With Gelly getting more mature and ready to be top level project for Flink, 
> we need to remove deprecated Spargel library from source and documentation.
> Gelly copies the library needed from Spargel so there should not be hard 
> dependency between the 2 modules.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2798] Serve static files for the new we...

2015-10-05 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1222#issuecomment-145733149
  
Thank you for the review. I'm trying to update the PR within the next 24 
hours
Sorry for not explaining why I changed all the request URLs to relative 
paths: When accessing the web interface through YARN, the files are not served 
from the root address, but using a proxy from the RM ` 
http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/`.
By making all request URLs relative, we can make sure everything is working 
as expected.

Let me know if there is a more elegant way of avoiding this.
I made a similar change to the old web interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2798) Prepare new web dashboard for executing in on YARN

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944453#comment-14944453
 ] 

ASF GitHub Bot commented on FLINK-2798:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1222#issuecomment-145733149
  
Thank you for the review. I'm trying to update the PR within the next 24 
hours
Sorry for not explaining why I changed all the request URLs to relative 
paths: When accessing the web interface through YARN, the files are not served 
from the root address, but using a proxy from the RM ` 
http://quickstart.cloudera:8088/proxy/application_1440768826963_0005/`.
By making all request URLs relative, we can make sure everything is working 
as expected.

Let me know if there is a more elegant way of avoiding this.
I made a similar change to the old web interface.


> Prepare new web dashboard for executing in on YARN
> --
>
> Key: FLINK-2798
> URL: https://issues.apache.org/jira/browse/FLINK-2798
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend, YARN Client
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2066][core] Add delay between execution...

2015-10-05 Thread WangCHX
Github user WangCHX commented on the pull request:

https://github.com/apache/flink/pull/1223#issuecomment-145733727
  
Thank you. Sure. I will do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2066) Make delay between execution retries configurable

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944455#comment-14944455
 ] 

ASF GitHub Bot commented on FLINK-2066:
---

Github user WangCHX commented on the pull request:

https://github.com/apache/flink/pull/1223#issuecomment-145733727
  
Thank you. Sure. I will do it.


> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>Priority: Blocker
>  Labels: starter
> Fix For: 0.10
>
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2066) Make delay between execution retries configurable

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2066:
--
Affects Version/s: 0.10

> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>Priority: Blocker
>  Labels: starter
> Fix For: 0.10
>
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2066) Make delay between execution retries configurable

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2066:
--
Fix Version/s: 0.10

> Make delay between execution retries configurable
> -
>
> Key: FLINK-2066
> URL: https://issues.apache.org/jira/browse/FLINK-2066
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 0.9, 0.10
>Reporter: Stephan Ewen
>Assignee: Nuno Miguel Marques dos Santos
>Priority: Blocker
>  Labels: starter
> Fix For: 0.10
>
>
> Flink allows to specify a delay between execution retries. This helps to let 
> some external failure causes fully manifest themselves before the restart is 
> attempted.
> The delay is currently defined only system wide.
> We should add it to the {{ExecutionConfig}} of a job to allow per-job 
> specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41127946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
---
@@ -47,8 +55,10 @@
public static CuratorFramework startCuratorFramework(Configuration 
configuration) {
String zkQuorum = 
configuration.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, "");
 
-   if(zkQuorum == null || zkQuorum.equals("")) {
-   throw new RuntimeException("No valid ZooKeeper quorum 
has been specified.");
+   if (zkQuorum == null || zkQuorum.equals("")) {
+   throw new RuntimeException("No valid ZooKeeper quorum 
has been specified. " +
+   "You can specify the quorum via the 
configuration key '" +
+   ConfigConstants.ZOOKEEPER_QUORUM_KEY + 
"'.");
--- End diff --

Good one :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-2802) Watermark triggered operators cannot progress with cyclic flows

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-2802.
---
   Resolution: Fixed
Fix Version/s: 0.10

As far as I understand this has been resolved. Resolving but feel free to 
reopen. 

> Watermark triggered operators cannot progress with cyclic flows
> ---
>
> Key: FLINK-2802
> URL: https://issues.apache.org/jira/browse/FLINK-2802
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Gyula Fora
>Assignee: Gyula Fora
>Priority: Blocker
> Fix For: 0.10
>
>
> The problem is that we can easily create a cyclic watermark (time) dependency 
> in the stream graph which will result in a deadlock for watermark triggered 
> operators such as  the `WindowOperator`.
> A solution to this could be to emit a Long.MAX_VALUE watermark from the 
> iteration sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2504) ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed spuriously

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2504:
--
Affects Version/s: 0.10

> ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed 
> spuriously
> -
>
> Key: FLINK-2504
> URL: https://issues.apache.org/jira/browse/FLINK-2504
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.10
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> The test 
> {{ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed}} 
> failed in one of my Travis builds: 
> https://travis-ci.org/tillrohrmann/flink/jobs/74881883



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2670) Unstable CombineTaskTest

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2670:
--
Fix Version/s: 0.10

> Unstable CombineTaskTest
> 
>
> Key: FLINK-2670
> URL: https://issues.apache.org/jira/browse/FLINK-2670
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Matthias J. Sax
>Assignee: Stephan Ewen
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> Fails with
> {noformat}
> ==
> Maven produced no output for 300 seconds.
> ==
> {noformat}
> https://travis-ci.org/apache/flink/jobs/80344487



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2504) ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed spuriously

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2504:
--
Fix Version/s: 0.10

> ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed failed 
> spuriously
> -
>
> Key: FLINK-2504
> URL: https://issues.apache.org/jira/browse/FLINK-2504
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.10
>Reporter: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> The test 
> {{ExternalSortLargeRecordsITCase.testSortWithLongAndShortRecordsMixed}} 
> failed in one of my Travis builds: 
> https://travis-ci.org/tillrohrmann/flink/jobs/74881883



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2670) Unstable CombineTaskTest

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-2670:
--
Affects Version/s: 0.10

> Unstable CombineTaskTest
> 
>
> Key: FLINK-2670
> URL: https://issues.apache.org/jira/browse/FLINK-2670
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 0.10
>Reporter: Matthias J. Sax
>Assignee: Stephan Ewen
>Priority: Critical
>  Labels: test-stability
> Fix For: 0.10
>
>
> Fails with
> {noformat}
> ==
> Maven produced no output for 300 seconds.
> ==
> {noformat}
> https://travis-ci.org/apache/flink/jobs/80344487



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129860
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
+* makes sure that data in ZooKeeper is small.
+*
+* @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet 

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
+* makes sure that data in ZooKeeper is small.
+*
+* @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet 

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129801
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
+* makes sure that data in ZooKeeper is small.
+*
+* @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet 

[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943207#comment-14943207
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129801
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, 

[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943209#comment-14943209
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, 

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
+* makes sure that data in ZooKeeper is small.
+*
+* @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet 

[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943205#comment-14943205
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129709
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, 

[jira] [Resolved] (FLINK-2367) “flink-xx-jobmanager-linux-3lsu.log" file can't auto be recovered/detected after mistaking delete

2015-10-05 Thread Maximilian Michels (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-2367.
---
Resolution: Auto Closed

Until further clarification I'm closing this issue. Feel free to reopen.

> “flink-xx-jobmanager-linux-3lsu.log" file can't auto be recovered/detected 
> after mistaking delete
> -
>
> Key: FLINK-2367
> URL: https://issues.apache.org/jira/browse/FLINK-2367
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 0.9
> Environment: Linux
>Reporter: chenliang613
>Assignee: chenliang613
>Priority: Minor
>  Labels: reliability
> Fix For: 0.9.0
>
>
> For checking system whether be adequately reliability, testers usually 
> designedly do some delete operation.
> Steps:
> 1.go to "flink\build-target\log" 
> 2.delete “flink-xx-jobmanager-linux-3lsu.log" file 
> 3.Run jobs along with writing log info, meanwhile the system didn't give any 
> error info when the log info can't be wrote correctly.
> 4.when some jobs be run failed , go to check log file for finding the reason, 
> can't find the log file. 
> Must restart Job Manager to regenerate the log file, then continue to run 
> jobs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943208#comment-14943208
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41129860
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, 

[jira] [Commented] (FLINK-2354) Recover running jobs on JobManager failure

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943218#comment-14943218
 ] 

ASF GitHub Bot commented on FLINK-2354:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41130914
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, but create 

[GitHub] flink pull request: [FLINK-2354] Add job graph and checkpoint reco...

2015-10-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/1153#discussion_r41130914
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ---
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * State handles backed by ZooKeeper.
+ *
+ * Added state is persisted via {@link StateHandle}s, which in turn are 
written to
+ * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
+ * small. ZooKeeper is build for data in the KB range whereas state can 
grow to multiple MBs.
+ *
+ * State modifications require some care, because it is possible that 
certain failures bring
+ * the state handle backend and ZooKeeper out of sync.
+ *
+ * ZooKeeper holds the ground truth about state handles, i.e. the 
following holds:
+ *
+ * 
+ * State handle in ZooKeeper => State handle exists
+ * 
+ *
+ * But not:
+ *
+ * 
+ * State handle exists => State handle in ZooKeeper
+ * 
+ *
+ * There can be lingering state handles when failures happen during 
operation. They
+ * need to be cleaned up manually (see https://issues.apache.org/jira/browse/FLINK-2513;>
+ * FLINK-2513 about a possible way to overcome this).
+ *
+ * @param  Type of state
+ */
+public class ZooKeeperStateHandleStore {
+
+   /** Curator ZooKeeper client */
+   private final CuratorFramework client;
+
+   /** State handle provider */
+   private final StateHandleProvider stateHandleProvider;
+
+   /**
+* Creates a {@link ZooKeeperStateHandleStore}.
+*
+* @param client  The Curator ZooKeeper client. 
Important: It is
+*expected that the client's namespace 
ensures that the root
+*path is exclusive for all state handles 
managed by this
+*instance, e.g. 
client.usingNamespace("/stateHandles")
+* @param stateHandleProvider The state handle provider for the state
+*/
+   public ZooKeeperStateHandleStore(
+   CuratorFramework client,
+   StateHandleProvider stateHandleProvider) {
+
+   this.client = checkNotNull(client, "Curator client");
+   this.stateHandleProvider = checkNotNull(stateHandleProvider, 
"State handle provider");
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper with create mode 
{@link
+* CreateMode#PERSISTENT}.
+*
+* @see #add(String, Serializable, CreateMode)
+*/
+   public ZooKeeperStateHandle add(String pathInZooKeeper, T state) 
throws Exception {
+   return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
+   }
+
+   /**
+* Creates a state handle and stores it in ZooKeeper.
+*
+* Important: This will not store the 
actual state in
+* ZooKeeper, but create a state handle and store it in ZooKeeper. This 
level of indirection
+* makes sure that data in ZooKeeper is small.
+*
+* @param pathInZooKeeper Destination path in ZooKeeper (expected to 
*not* exist yet and
+ 

[jira] [Commented] (FLINK-2767) Add support Scala 2.11 to Scala shell

2015-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943297#comment-14943297
 ] 

ASF GitHub Bot commented on FLINK-2767:
---

Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1197#issuecomment-145510215
  
For me it says:
~~~bash
Failed to created JLineReader: java.lang.NoClassDefFoundError: 
jline/console/completer/Completer
Falling back to SimpleReader.
~~~


> Add support Scala 2.11 to Scala shell
> -
>
> Key: FLINK-2767
> URL: https://issues.apache.org/jira/browse/FLINK-2767
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 0.10
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>
> Since FLINK-2200 is resolved, the Flink community provides JARs for Scala 
> 2.11. But currently, there is no Scala shell with Scala 2.11. If we add 
> support Scala 2.11 to Scala shell, the user with Scala 2.11 could use Flink 
> easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2767] [scala shell] Add Scala 2.11 supp...

2015-10-05 Thread nikste
Github user nikste commented on the pull request:

https://github.com/apache/flink/pull/1197#issuecomment-145510215
  
For me it says:
~~~bash
Failed to created JLineReader: java.lang.NoClassDefFoundError: 
jline/console/completer/Completer
Falling back to SimpleReader.
~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1675) Rework Accumulators

2015-10-05 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14943301#comment-14943301
 ] 

Ufuk Celebi commented on FLINK-1675:


Max, there was this PR which addressed (5): 
https://github.com/apache/flink/pull/570

> Rework Accumulators
> ---
>
> Key: FLINK-1675
> URL: https://issues.apache.org/jira/browse/FLINK-1675
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Stephan Ewen
> Fix For: 0.10
>
>
> The accumulators need an overhaul to address various issues:
> 1.  User defined Accumulator classes crash the client, because it is not 
> using the user code classloader to decode the received message.
> 2.  They should be attached to the ExecutionGraph, not the dedicated 
> AccumulatorManager. That makes them accessible also for archived execution 
> graphs.
> 3.  Accumulators should be sent periodically, as part of the heart beat that 
> sends metrics. This allows them to be updated in real time
> 4. Accumulators should be stored fine grained (per executionvertex, or per 
> execution) and the final value should be on computed by merging all involved 
> ones. This allows users to access the per-subtask accumulators, which is 
> often interesting.
> 5. Accumulators should subsume the aggregators by allowing to be "versioned" 
> with a superstep. The versioned ones should be redistributed to the cluster 
> after each superstep.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   >