[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/858


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-15 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-140405615
  
I've rebased again...If nobody objects, I will merge this soon. The new 
API-facing methods on `ExecutionEnvironment` will be disabled until we 
implement first applications of session management. I've added a separate 
commit that does that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-09 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/858#discussion_r39017774
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 ---
@@ -53,41 +54,41 @@ class JobManagerFailsITCase(_system: ActorSystem)
   }
 
   "A TaskManager" should {
-"detect a lost connection to the JobManager and try to reconnect to 
it" in {
-
-  val num_slots = 13
-  val cluster = startDeathwatchCluster(num_slots, 1)
-
-  val tm = cluster.getTaskManagers(0)
-  val jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-
-  // disable disconnect message to test death watch
-  tm ! DisableDisconnect
-
-  try {
-within(TestingUtils.TESTING_DURATION) {
-  jmGateway.tell(RequestNumberRegisteredTaskManager, self)
-  expectMsg(1)
-
-  tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
-
-  jmGateway.tell(PoisonPill, self)
-
-  expectMsgClass(classOf[JobManagerTerminated])
-
-  cluster.restartLeadingJobManager()
-
-  cluster.waitForTaskManagersToBeRegistered()
-
-  cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-.tell(RequestNumberRegisteredTaskManager, self)
-
-  expectMsg(1)
-}
-  } finally {
-cluster.stop()
-  }
-}
+//"detect a lost connection to the JobManager and try to reconnect to 
it" in {
--- End diff --

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-09 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-138825995
  
Could you elaborate a little bit on what you refactored and which 
components would be important to review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-138848287
  
Of course! The following classes have been refactored in the course of 
integrating them with the session management:

**Client**
- Establish connection to JobManager on creation
- Refactor run method into `runBlocking` and `runDetached`
- Extract helper classes to generate the Plan
- Make Optimizer and JobGraph generation methods `static`
- Pass `ClassLoader` correctly (do not keep one per Client but rather let 
it be passed before submission)

**CliFrontend**
- `runBlocking` and `runDetached` methods by analogy with the Client class

**ExecutionEnvironment**, **LocalEnvironment**, **RemoteEnvironment**
- modified abstract class to support sessions (timeout and jobID generation)
- handle session management via Reapers and ShutdownHooks

**PlanExecutor**, **LocalExecutor**, **RemoteExecutor**
- modified interface
- support session termination
- set JobID on Plan

**JobManager**
- keep ExecutionGraph as long as session has not expired

Future issues:
- Support for sessions in streaming. Currently streaming jobs are agnostic 
of sessions.
- Representation of sessions in the JobManager web frontend. How do we 
represent updates to the ExecutionGraph in sessions?
- Build features on top of session management (e.g. intermediate results)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-09 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-138860661
  
Thanks Max for the detailed description.

On Wed, Sep 9, 2015 at 11:12 AM, Max  wrote:

> Of course! The following classes have been refactored in the course of
> integrating them with the session management:
>
> *Client*
>
>- Establish connection to JobManager on creation
>- Refactor run method into runBlocking and runDetached
>- Extract helper classes to generate the Plan
>- Make Optimizer and JobGraph generation methods static
>- Pass ClassLoader correctly (do not keep one per Client but rather
>let it be passed before submission)
>
> *CliFrontend*
>
>- runBlocking and runDetached methods by analogy with the Client class
>
> *ExecutionEnvironment*, *LocalEnvironment*, *RemoteEnvironment*
>
>- modified abstract class to support sessions (timeout and jobID
>generation)
>- handle session management via Reapers and ShutdownHooks
>
> *PlanExecutor*, *LocalExecutor*, *RemoteExecutor*
>
>- modified interface
>- support session termination
>- set JobID on Plan
>
> *JobManager*
>
>- keep ExecutionGraph as long as session has not expired
>
> Future issues:
>
>- Support for sessions in streaming. Currently streaming jobs are
>agnostic of sessions.
>- Representation of sessions in the JobManager web frontend. How do we
>represent updates to the ExecutionGraph in sessions?
>- Build features on top of session management (e.g. intermediate
>results)
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-08 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-138625172
  
I've ported this pull request to the latest master. It was a lot more work 
than I anticipated because some classes had diverged significantly and merging 
them was a bit hard.

Due to some refactoring, the changes have grown quite large again and I 
know that makes reviewing hard. Despite that, I wouldn't delay merging this 
pull request much further. We can disable the session management until it is 
integrated with the rest of the system (intermediate results) by throwing an 
exception on the interface methods. If we decide later, that we want to delay 
this feature, we could also remove the session code. In that case, it would 
still make sense to merge this pull request because it contains a lot of nice 
refactoring.

With the session management in place, we can reuse already computed 
intermediate results with not too much effort. Actually, only some API changes 
are remaining to expose the session management to the user in production.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-09-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/858#discussion_r38951273
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 ---
@@ -53,41 +54,41 @@ class JobManagerFailsITCase(_system: ActorSystem)
   }
 
   "A TaskManager" should {
-"detect a lost connection to the JobManager and try to reconnect to 
it" in {
-
-  val num_slots = 13
-  val cluster = startDeathwatchCluster(num_slots, 1)
-
-  val tm = cluster.getTaskManagers(0)
-  val jmGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-
-  // disable disconnect message to test death watch
-  tm ! DisableDisconnect
-
-  try {
-within(TestingUtils.TESTING_DURATION) {
-  jmGateway.tell(RequestNumberRegisteredTaskManager, self)
-  expectMsg(1)
-
-  tm ! NotifyWhenJobManagerTerminated(jmGateway.actor)
-
-  jmGateway.tell(PoisonPill, self)
-
-  expectMsgClass(classOf[JobManagerTerminated])
-
-  cluster.restartLeadingJobManager()
-
-  cluster.waitForTaskManagersToBeRegistered()
-
-  cluster.getLeaderGateway(TestingUtils.TESTING_DURATION)
-.tell(RequestNumberRegisteredTaskManager, self)
-
-  expectMsg(1)
-}
-  } finally {
-cluster.stop()
-  }
-}
+//"detect a lost connection to the JobManager and try to reconnect to 
it" in {
--- End diff --

Re-enable the test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-07-22 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-123755792
  
It should just add more nodes to the ExecutionGraph. Existing ones should 
not be modified. For batch, I think the assumption is that it needs to be 
finished. For streaming, I could also picture attaching nodes at runtime but 
this has to be carefully implemented..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-123754538
  
I think right now, it pretty much behaves as if someone started a new job, 
with the grown execution graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-07-22 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/858#issuecomment-123726241
  
I just had a look at the JobManager in a different context and thought 
about the following, which might be relevant here: when submitting a new 
JobGraph, which is attached to an existing ExecutionGraph, some ExecutionGraph 
state is overwritten by the new JobGraph. With some you might run into (maybe) 
unexpected behaviour like resetting number of left execution retries or 
creating a new CheckpointCoordinator for the ExecutionGraph.

What's the intended behaviour of attaching to an existing ExecutionGraph? 
Is there an implicit assumption that the existing ExecutionGraph needs to be 
finished already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2097] Implement job session management

2015-06-22 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/858

[FLINK-2097] Implement job session management

This is a joint effort by @StephanEwen and me to introduce a session 
management in Flink. Session are used to keep a copy of the ExecutionGraph in 
the job manager for the session lifetime. It is important that the 
ExecutionGraph is not kept around longer because it consumes a lot of memory. 
Its intermediate results can also be freed. To integrate sessions properly into 
Flink, some refactoring was necessary. In particular these are:

- JobId is created through the ExecutionEnvironment and passed through
- Sessions can be termined by the ExecutionEnvironment or directly through 
the executor
- Session are cancelled implicitly through reapers or shutdown hooks in 
the ExecutionEnvironment, otherwise they time out
- LocalExecutor and RemoteExecutor manage sessions
- The Client only deals with the communication with the job manager and is 
agnostic of session management

With the session management, we will be able to properly support 
backtracking of produced intermediate results. This makes calls to 
count()/collect()/print() efficient and enables to write 
incremental/interactive jobs. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink session-dev

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/858.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #858


commit 9852d392bfe69b056596acfba001ab0a574f0ac0
Author: Maximilian Michels m...@apache.org
Date:   2015-05-13T15:06:47Z

[FLINK-2097] [core] Implement job session management.

Sessions make sure that the JobManager does not immediately discard a 
JobGraph after execution, but keeps it
around for further operations to be attached to the graph. That is the 
basis if interactive sessions.

This pull request implements a rudimentary session management. Together 
with the backtracking #640,
this will enable users to submit jobs to the cluster and access 
intermediate results. Session handling ensures that the results are cleared 
eventually.

ExecutionGraphs are kept as long as
- no timeout occurred or
- the session has not been explicitly ended

commit 65464ad19d39a29d41d071b2a4524b414e297147
Author: Stephan Ewen se...@apache.org
Date:   2015-05-29T12:35:33Z

[FLINK-2097] [core] Improve session management.

 - The Client manages only connections to the JobManager, it is not job 
specific
 - Executors provide a more explicit life cycle and methods to start new 
sessions
 - Sessions are handled by the environments
 - The environments use reapers (local) and shutdown hooks (remote) to 
ensure session termination
   when the environment runs out of scope

commit 6d89edd4a63fa3971c0246f46c7b8c98f3fc6c30
Author: Maximilian Michels m...@apache.org
Date:   2015-06-18T14:38:09Z

[FLINK-2097] [core] Finalize session management




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---