[FLINK-2097][core] implement a job session management

Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking #640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes #858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71bf2f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71bf2f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71bf2f57

Branch: refs/heads/master
Commit: 71bf2f570861daae53b24bfcf1d06aedb85311b9
Parents: 7984acc
Author: Maximilian Michels <m...@apache.org>
Authored: Fri Sep 4 17:34:44 2015 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Tue Sep 22 19:55:46 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 182 +++---
 .../org/apache/flink/client/LocalExecutor.java  | 222 ++++---
 .../org/apache/flink/client/RemoteExecutor.java | 188 ++++--
 .../org/apache/flink/client/program/Client.java | 653 +++++++++----------
 .../client/program/ContextEnvironment.java      |  40 +-
 .../flink/client/program/JobWithJars.java       |   5 +-
 .../program/OptimizerPlanEnvironment.java       | 132 ++++
 .../flink/client/program/PackagedProgram.java   |  51 +-
 .../client/program/PreviewPlanEnvironment.java  |  80 +++
 .../flink/client/web/JobSubmissionServlet.java  |  30 +-
 .../flink/client/CliFrontendInfoTest.java       |  81 +--
 .../client/CliFrontendPackageProgramTest.java   |  10 +-
 .../apache/flink/client/CliFrontendRunTest.java |  30 +-
 .../RemoteExecutorHostnameResolutionTest.java   |   6 +-
 .../client/program/ClientConnectionTest.java    |   8 +-
 .../apache/flink/client/program/ClientTest.java | 142 ++--
 .../ExecutionPlanAfterExecutionTest.java        |  15 +-
 .../program/ExecutionPlanCreationTest.java      |   9 +-
 .../client/program/PackagedProgramTest.java     |   1 -
 .../stormcompatibility/api/FlinkClient.java     |  22 +-
 .../flink/api/common/JobExecutionResult.java    |  17 +-
 .../java/org/apache/flink/api/common/JobID.java |  46 +-
 .../flink/api/common/JobSubmissionResult.java   |   5 +-
 .../java/org/apache/flink/api/common/Plan.java  |  71 +-
 .../apache/flink/api/common/PlanExecutor.java   |  85 ++-
 .../flink/api/java/CollectionEnvironment.java   |   4 +
 .../flink/api/java/ExecutionEnvironment.java    |  97 ++-
 .../apache/flink/api/java/LocalEnvironment.java | 180 ++++-
 .../flink/api/java/RemoteEnvironment.java       | 153 ++++-
 .../flink/optimizer/plan/OptimizedPlan.java     |  15 +-
 .../plantranslate/JobGraphGenerator.java        |  24 +-
 .../optimizer/postpass/JavaApiPostPass.java     |   2 +-
 .../apache/flink/runtime/client/JobClient.java  |  11 +-
 .../runtime/client/JobExecutionException.java   |   2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   8 +
 .../apache/flink/runtime/jobgraph/JobGraph.java |  58 +-
 .../runtime/taskmanager/TaskExecutionState.java |   2 +-
 .../flink/runtime/jobmanager/JobInfo.scala      |  18 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  98 ++-
 .../runtime/jobmanager/MemoryArchivist.scala    |  11 +-
 .../runtime/messages/JobManagerMessages.scala   |   6 +
 .../runtime/minicluster/FlinkMiniCluster.scala  |   9 +-
 .../PartialConsumePipelinedResultTest.java      |   3 +-
 .../TaskManagerProcessReapingTest.java          |   2 +-
 .../runtime/jobmanager/JobManagerITCase.scala   | 126 +++-
 .../flink/api/scala/ExecutionEnvironment.scala  |  31 +-
 .../api/avro/AvroExternalJarProgramITCase.java  |  38 +-
 .../environment/RemoteStreamEnvironment.java    |  24 +-
 .../environment/StreamContextEnvironment.java   |  28 +-
 .../environment/StreamExecutionEnvironment.java |  11 +-
 .../api/environment/StreamPlanEnvironment.java  |   7 +-
 .../flink/tez/client/LocalTezEnvironment.java   |   5 +
 .../flink/tez/client/RemoteTezEnvironment.java  |   5 +
 .../apache/flink/tez/client/TezExecutor.java    |  21 +
 .../apache/flink/test/util/TestEnvironment.java |   4 +
 .../clients/examples/LocalExecutorITCase.java   |   3 +-
 .../RemoteEnvironmentITCase.java                |   2 +-
 .../jsonplan/DumpCompiledPlanTest.java          |   6 +-
 .../jsonplan/JsonJobGraphGenerationTest.java    |   4 +
 .../jobmanager/JobManagerFailsITCase.scala      |   1 -
 .../org/apache/flink/yarn/YarnTestBase.java     |   2 +-
 .../org/apache/flink/yarn/FlinkYarnCluster.java |   2 +
 62 files changed, 2114 insertions(+), 1040 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index fc4d98a..f0e6c4f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -57,10 +57,13 @@ import 
org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -297,44 +300,51 @@ public class CliFrontend {
                        int userParallelism = options.getParallelism();
                        LOG.debug("User parallelism is set to {}", 
userParallelism);
 
-                       Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
+                       Client client = getClient(options, 
program.getMainClassName(), userParallelism);
                        
client.setPrintStatusDuringExecution(options.getStdoutLogging());
                        LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
-                       if(client.getMaxSlots() != -1 && userParallelism == -1) 
{
-                               logAndSysout("Using the parallelism provided by 
the remote cluster ("+client.getMaxSlots()+"). " +
-                                               "To use another parallelism, 
set it at the ./bin/flink client.");
-                               userParallelism = client.getMaxSlots();
-                       }
 
-                       // check if detached per job yarn cluster is used to 
start flink
-                       if(yarnCluster != null && yarnCluster.isDetached()) {
-                               logAndSysout("The Flink YARN client has been 
started in detached mode. In order to stop " +
-                                               "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
-                                               "yarn application -kill " + 
yarnCluster.getApplicationId() + "\n" +
-                                               "Please also note that the 
temporary files of the YARN session in the home directoy will not be removed.");
-                               exitCode = executeProgram(program, client, 
userParallelism, false);
-                       } else {
-                               // regular (blocking) execution.
-                               exitCode = executeProgram(program, client, 
userParallelism, true);
-                       }
+                       try {
+                               if (client.getMaxSlots() != -1 && 
userParallelism == -1) {
+                                       logAndSysout("Using the parallelism 
provided by the remote cluster ("+client.getMaxSlots()+"). " +
+                                                       "To use another 
parallelism, set it at the ./bin/flink client.");
+                                       userParallelism = client.getMaxSlots();
+                               }
 
-                       // show YARN cluster status if its not a detached YARN 
cluster.
-                       if (yarnCluster != null && !yarnCluster.isDetached()) {
-                               List<String> msgs = 
yarnCluster.getNewMessages();
-                               if (msgs != null && msgs.size() > 1) {
+                               // check if detached per job yarn cluster is 
used to start flink
+                               if (yarnCluster != null && 
yarnCluster.isDetached()) {
+                                       logAndSysout("The Flink YARN client has 
been started in detached mode. In order to stop " +
+                                                       "Flink on YARN, use the 
following command or a YARN web interface to stop it:\n" +
+                                                       "yarn application -kill 
" + yarnCluster.getApplicationId() + "\n" +
+                                                       "Please also note that 
the temporary files of the YARN session in the home directoy will not be 
removed.");
+                                       exitCode = 
executeProgramDetached(program, client, userParallelism);
+                               }
+                               else {
+                                       // regular (blocking) execution.
+                                       exitCode = 
executeProgramBlocking(program, client, userParallelism);
+                               }
+
+                               // show YARN cluster status if its not a 
detached YARN cluster.
+                               if (yarnCluster != null && 
!yarnCluster.isDetached()) {
+                                       List<String> msgs = 
yarnCluster.getNewMessages();
+                                       if (msgs != null && msgs.size() > 1) {
 
-                                       logAndSysout("The following messages 
were created by the YARN cluster while running the Job:");
-                                       for (String msg : msgs) {
-                                               logAndSysout(msg);
+                                               logAndSysout("The following 
messages were created by the YARN cluster while running the Job:");
+                                               for (String msg : msgs) {
+                                                       logAndSysout(msg);
+                                               }
+                                       }
+                                       if (yarnCluster.hasFailed()) {
+                                               logAndSysout("YARN cluster is 
in failed state!");
+                                               logAndSysout("YARN Diagnostics: 
" + yarnCluster.getDiagnostics());
                                        }
                                }
-                               if (yarnCluster.hasFailed()) {
-                                       logAndSysout("YARN cluster is in failed 
state!");
-                                       logAndSysout("YARN Diagnostics: " + 
yarnCluster.getDiagnostics());
-                               }
-                       }
 
-                       return exitCode;
+                               return exitCode;
+                       }
+                       finally {
+                               client.shutdown();
+                       }
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -395,8 +405,10 @@ public class CliFrontend {
                        int parallelism = options.getParallelism();
 
                        LOG.info("Creating program plan dump");
-                       Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
-                       FlinkPlan flinkPlan = client.getOptimizedPlan(program, 
parallelism);
+
+                       Optimizer compiler = new Optimizer(new 
DataStatistics(), new DefaultCostEstimator(), config);
+
+                       FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, 
program, parallelism);
 
                        if (webFrontend) {
                                this.optimizedPlan = flinkPlan;
@@ -425,6 +437,8 @@ public class CliFrontend {
                                }
                        }
                        return 0;
+
+
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -623,52 +637,65 @@ public class CliFrontend {
        //  Interaction with programs and JobManager
        // 
--------------------------------------------------------------------------------------------
 
-       protected int executeProgram(PackagedProgram program, Client client, 
int parallelism, boolean wait) {
-               LOG.info("Starting execution of program");
-               JobSubmissionResult execResult;
+       protected int executeProgramDetached(PackagedProgram program, Client 
client, int parallelism) {
+               JobSubmissionResult result;
                try {
-                       execResult = client.run(program, parallelism, wait);
-               }
-               catch (ProgramInvocationException e) {
+                       result = client.runDetached(program, parallelism);
+               } catch (ProgramInvocationException e) {
                        return handleError(e);
-               }
-               finally {
+               } finally {
                        program.deleteExtractedLibraries();
                }
 
-               if(wait) {
-                       LOG.info("Program execution finished");
-               }
-
-               // we come here after the job has finished (or the job has been 
submitted)
-               if (execResult != null) {
+               if (result != null) {
                        // if the job has been submitted to a detached YARN 
cluster, there won't be any
                        // exec results, but the object will be set (for the 
job id)
                        if (yarnCluster != null && yarnCluster.isDetached()) {
-                               if(execResult.getJobID() == null) {
-                                       throw new RuntimeException("Error while 
starting job. No Job ID set.");
-                               }
-                               yarnCluster.stopAfterJob(execResult.getJobID());
+
+                               yarnCluster.stopAfterJob(result.getJobID());
                                yarnCluster.disconnect();
-                               if(!webFrontend) {
-                                       System.out.println("The Job has been 
submitted with JobID "+execResult.getJobID());
+                               if (!webFrontend) {
+                                       System.out.println("The Job has been 
submitted with JobID " + result.getJobID());
                                }
                                return 0;
-                       }
-                       if (execResult instanceof JobExecutionResult) {
-                               JobExecutionResult result = 
(JobExecutionResult) execResult;
-                               if(!webFrontend) {
-                                       System.out.println("Job Runtime: " + 
result.getNetRuntime() + " ms");
-                               }
-                               Map<String, Object> accumulatorsResult = 
result.getAllAccumulatorResults();
-                               if (accumulatorsResult.size() > 0 && 
!webFrontend) {
-                                       System.out.println("Accumulator 
Results: ");
-                                       
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
-                               }
                        } else {
-                               LOG.info("The Job did not return an execution 
result");
+                               throw new RuntimeException("Error while 
starting job. No Job ID set.");
+                       }
+               }
+
+               return 0;
+       }
+
+       protected int executeProgramBlocking(PackagedProgram program, Client 
client, int parallelism) {
+               LOG.info("Starting execution of program");
+
+               JobExecutionResult result;
+               try {
+                       client.setPrintStatusDuringExecution(true);
+                       result = client.runBlocking(program, parallelism);
+               }
+               catch (ProgramInvocationException e) {
+                       return handleError(e);
+               }
+               finally {
+                       program.deleteExtractedLibraries();
+               }
+
+               LOG.info("Program execution finished");
+
+               if (result != null) {
+                       if (!webFrontend) {
+                               System.out.println("Job Runtime: " + 
result.getNetRuntime() + " ms");
+                       }
+                       Map<String, Object> accumulatorsResult = 
result.getAllAccumulatorResults();
+                       if (accumulatorsResult.size() > 0 && !webFrontend) {
+                               System.out.println("Accumulator Results: ");
+                               
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
                        }
+               } else {
+                       LOG.info("The Job did not return an execution result");
                }
+
                return 0;
        }
 
@@ -767,7 +794,6 @@ public class CliFrontend {
         * Retrieves a {@link Client} object from the given command line 
options and other parameters.
         *
         * @param options Command line options which contain JobManager address
-        * @param classLoader Class loader to use by the Client
         * @param programName Program name
         * @param userParallelism Given user parallelism
         * @return
@@ -775,12 +801,10 @@ public class CliFrontend {
         */
        protected Client getClient(
                        CommandLineOptions options,
-                       ClassLoader classLoader,
                        String programName,
                        int userParallelism)
                throws Exception {
-               InetSocketAddress jobManagerAddress = null;
-
+               InetSocketAddress jobManagerAddress;
                int maxSlots = -1;
 
                if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
@@ -796,14 +820,16 @@ public class CliFrontend {
 
                        // the number of slots available from YARN:
                        int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-                       if(yarnTmSlots == -1) {
+                       if (yarnTmSlots == -1) {
                                yarnTmSlots = 1;
                        }
                        maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-                       if(userParallelism != -1) {
+                       if (userParallelism != -1) {
                                int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-                               logAndSysout("The YARN cluster has "+maxSlots+" 
slots available, but the user requested a parallelism of "+userParallelism+" on 
YARN. " +
-                                               "Each of the 
"+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" 
slots.");
+                               logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
+                                               "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
+                                               "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
+                                               "will get "+slotsPerTM+" 
slots.");
                                flinkYarnClient.setTaskManagerSlots(slotsPerTM);
                        }
 
@@ -811,11 +837,12 @@ public class CliFrontend {
                                yarnCluster = flinkYarnClient.deploy();
                                yarnCluster.connectToCluster();
                        }
-                       catch(Exception e) {
+                       catch (Exception e) {
                                throw new RuntimeException("Error deploying the 
YARN cluster", e);
                        }
 
                        jobManagerAddress = yarnCluster.getJobManagerAddress();
+                       writeJobManagerAddressToConfig(jobManagerAddress);
 
                        logAndSysout("YARN cluster started");
                        logAndSysout("JobManager web interface address " + 
yarnCluster.getWebInterfaceURL());
@@ -847,14 +874,11 @@ public class CliFrontend {
                else {
                        if(options.getJobManagerAddress() != null) {
                                jobManagerAddress = 
parseHostPortAddress(options.getJobManagerAddress());
+                               
writeJobManagerAddressToConfig(jobManagerAddress);
                        }
                }
 
-               if(jobManagerAddress != null) {
-                       writeJobManagerAddressToConfig(jobManagerAddress);
-               }
-
-               return new Client(config, classLoader, maxSlots);
+               return new Client(config, maxSlots);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
index cf08e0a..7928e53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java
@@ -22,11 +22,14 @@ import java.util.List;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.DataStatistics;
@@ -35,52 +38,66 @@ import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 
 /**
- * A class for executing a {@link Plan} on a local embedded Flink runtime 
instance.
+ * A PlanExecutor that runs Flink programs on a local embedded Flink runtime 
instance.
+ *
+ * <p>By simply calling the {@link 
#executePlan(org.apache.flink.api.common.Plan)} method,
+ * this executor still start up and shut down again immediately after the 
program finished.</p>
+ *
+ * <p>To use this executor to execute many dataflow programs that constitute 
one job together,
+ * then this executor needs to be explicitly started, to keep running across 
several executions.</p>
  */
 public class LocalExecutor extends PlanExecutor {
        
-       private static boolean DEFAULT_OVERWRITE = false;
+       private static final boolean DEFAULT_OVERWRITE = false;
 
        private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
 
-       private final Object lock = new Object();       // we lock to ensure 
singleton execution
-       
+       /** we lock to ensure singleton execution */
+       private final Object lock = new Object();
+
+       /** The mini cluster on which to execute the local programs */
        private LocalFlinkMiniCluster flink;
 
+       /** Custom user configuration for the execution */
        private Configuration configuration;
 
-       // ---------------------------------- config options 
------------------------------------------
-       
+       /** Config value for how many slots to provide in the local cluster */
        private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
 
+       /** Config flag whether to overwrite existing files by default */
        private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
+
+       // 
------------------------------------------------------------------------
+
        public LocalExecutor() {
-               if (!ExecutionEnvironment.localExecutionIsAllowed()) {
-                       throw new InvalidProgramException("The LocalEnvironment 
cannot be used when submitting a program through a client.");
-               }
+               this(null);
        }
 
        public LocalExecutor(Configuration conf) {
-               this();
-               this.configuration = conf;
+               if (!ExecutionEnvironment.localExecutionIsAllowed()) {
+                       throw new InvalidProgramException(
+                                       "The LocalEnvironment cannot be used 
when submitting a program through a client.");
+               }
+
+               this.configuration = conf != null ? conf : new Configuration();
        }
 
+       // 
------------------------------------------------------------------------
+       //  Configuration
+       // 
------------------------------------------------------------------------
 
-       
        public boolean isDefaultOverwriteFiles() {
                return defaultOverwriteFiles;
        }
-       
+
        public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
                this.defaultOverwriteFiles = defaultOverwriteFiles;
        }
-       
+
        public void setTaskManagerNumSlots(int taskManagerNumSlots) {
                this.taskManagerNumSlots = taskManagerNumSlots; 
        }
@@ -88,51 +105,48 @@ public class LocalExecutor extends PlanExecutor {
        public int getTaskManagerNumSlots() {
                return this.taskManagerNumSlots;
        }
-       
-       // 
--------------------------------------------------------------------------------------------
 
-       public static Configuration createConfiguration(LocalExecutor le) {
-               Configuration configuration = new Configuration();
-               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
le.getTaskManagerNumSlots());
-               
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, 
le.isDefaultOverwriteFiles());
-               return configuration;
-       }
+       // 
--------------------------------------------------------------------------------------------
 
+       @Override
        public void start() throws Exception {
-               synchronized (this.lock) {
-                       if (this.flink == null) {
-                               
+               synchronized (lock) {
+                       if (flink == null) {
                                // create the embedded runtime
-                               Configuration configuration = 
createConfiguration(this);
-                               if(this.configuration != null) {
+                               Configuration configuration = 
createConfiguration();
+                               if (this.configuration != null) {
                                        
configuration.addAll(this.configuration);
                                }
                                // start it up
-                               this.flink = new 
LocalFlinkMiniCluster(configuration, true);
+                               flink = new 
LocalFlinkMiniCluster(configuration, true);
                                this.flink.start();
                        } else {
                                throw new IllegalStateException("The local 
executor was already started.");
                        }
                }
        }
-
-       /**
-        * Stop the local executor instance. You should not call executePlan 
after this.
-        */
+       
+       @Override
        public void stop() throws Exception {
-               synchronized (this.lock) {
-                       if (this.flink != null) {
-                               this.flink.stop();
-                               this.flink = null;
-                       } else {
-                               throw new IllegalStateException("The local 
executor was not started.");
+               synchronized (lock) {
+                       if (flink != null) {
+                               flink.stop();
+                               flink = null;
                        }
                }
        }
 
+       @Override
+       public boolean isRunning() {
+               return flink != null;
+       }
+
        /**
-        * Execute the given plan on the local Nephele instance, wait for the 
job to
-        * finish and return the runtime in milliseconds.
+        * Executes the given program on a local runtime and waits for the job 
to finish.
+        * 
+        * <p>If the executor has not been started before, this starts the 
executor and shuts it down
+        * after the job finished. If the job runs in session mode, the 
executor is kept alive until
+        * no more references to the executor exist.</p>
         * 
         * @param plan The plan of the program to execute.
         * @return The net runtime of the program, in milliseconds.
@@ -145,15 +159,15 @@ public class LocalExecutor extends PlanExecutor {
                if (plan == null) {
                        throw new IllegalArgumentException("The plan may not be 
null.");
                }
-               
+
                synchronized (this.lock) {
-                       
+
                        // check if we start a session dedicated for this 
execution
                        final boolean shutDownAtEnd;
-                       if (this.flink == null) {
-                               // we start a session just for us now
+
+                       if (flink == null) {
                                shutDownAtEnd = true;
-                               
+
                                // configure the number of local slots equal to 
the parallelism of the local plan
                                if (this.taskManagerNumSlots == 
DEFAULT_TASK_MANAGER_NUM_SLOTS) {
                                        int maxParallelism = 
plan.getMaximumParallelism();
@@ -161,9 +175,11 @@ public class LocalExecutor extends PlanExecutor {
                                                this.taskManagerNumSlots = 
maxParallelism;
                                        }
                                }
-                               
+
+                               // start the cluster for us
                                start();
-                       } else {
+                       }
+                       else {
                                // we use the existing session
                                shutDownAtEnd = false;
                        }
@@ -173,10 +189,10 @@ public class LocalExecutor extends PlanExecutor {
 
                                Optimizer pc = new Optimizer(new 
DataStatistics(), configuration);
                                OptimizedPlan op = pc.compile(plan);
-                               
+
                                JobGraphGenerator jgg = new 
JobGraphGenerator(configuration);
-                               JobGraph jobGraph = jgg.compileJobGraph(op);
-                               
+                               JobGraph jobGraph = jgg.compileJobGraph(op, 
plan.getJobId());
+
                                boolean sysoutPrint = 
isPrintingStatusDuringExecution();
                                return flink.submitJobAndWait(jobGraph, 
sysoutPrint);
                        }
@@ -189,32 +205,50 @@ public class LocalExecutor extends PlanExecutor {
        }
 
        /**
-        * Returns a JSON dump of the optimized plan.
-        * 
-        * @param plan
-        *            The program's plan.
-        * @return JSON dump of the optimized plan.
-        * @throws Exception
+        * Creates a JSON representation of the given dataflow's execution plan.
+        *
+        * @param plan The dataflow plan.
+        * @return The dataflow's execution plan, as a JSON string.
+        * @throws Exception Thrown, if the optimization process that creates 
the execution plan failed.
         */
        @Override
        public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-               Optimizer pc = new Optimizer(new DataStatistics(), 
createConfiguration(this));
+               final int parallelism = plan.getDefaultParallelism() == -1 ? 1 
: plan.getDefaultParallelism();
+
+               Optimizer pc = new Optimizer(new DataStatistics(), 
this.configuration);
+               pc.setDefaultParallelism(parallelism);
                OptimizedPlan op = pc.compile(plan);
-               PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-       
-               return gen.getOptimizerPlanAsJSON(op);
+
+               return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
        }
-       
+
+       @Override
+       public void endSession(JobID jobID) throws Exception {
+               LocalFlinkMiniCluster flink = this.flink;
+               if (flink != null) {
+                       ActorGateway leaderGateway = 
flink.getLeaderGateway(AkkaUtils.getDefaultTimeout());
+                       leaderGateway.tell(new 
JobManagerMessages.RemoveCachedJob(jobID));
+               }
+       }
+
+       private Configuration createConfiguration() {
+               Configuration configuration = new Configuration();
+               
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
getTaskManagerNumSlots());
+               
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, 
isDefaultOverwriteFiles());
+               return configuration;
+       }
+
+
        // 
--------------------------------------------------------------------------------------------
        //  Static variants that internally bring up an instance and shut it 
down after the execution
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
-        * Executes the program described by the given plan assembler.
+        * Executes the given program.
         * 
-        * @param pa The program's plan assembler. 
+        * @param pa The program.
         * @param args The parameters.
-        * @return The net runtime of the program, in milliseconds.
+        * @return The execution result of the program.
         * 
         * @throws Exception Thrown, if either the startup of the local 
execution context, or the execution
         *                   caused an exception.
@@ -222,57 +256,45 @@ public class LocalExecutor extends PlanExecutor {
        public static JobExecutionResult execute(Program pa, String... args) 
throws Exception {
                return execute(pa.getPlan(args));
        }
-       
+
        /**
-        * Executes the program represented by the given Pact plan.
+        * Executes the given dataflow plan.
         * 
-        * @param plan The program's plan. 
-        * @return The net runtime of the program, in milliseconds.
+        * @param plan The dataflow plan. 
+        * @return The execution result.
         * 
         * @throws Exception Thrown, if either the startup of the local 
execution context, or the execution
         *                   caused an exception.
         */
        public static JobExecutionResult execute(Plan plan) throws Exception {
-               LocalExecutor exec = new LocalExecutor();
-               try {
-                       exec.start();
-                       return exec.executePlan(plan);
-               } finally {
-                       exec.stop();
-               }
+               return new LocalExecutor().executePlan(plan);
        }
 
        /**
-        * Returns a JSON dump of the optimized plan.
+        * Creates a JSON representation of the given dataflow's execution plan.
         * 
-        * @param plan
-        *            The program's plan.
-        * @return JSON dump of the optimized plan.
-        * @throws Exception
+        * @param plan The dataflow plan.
+        * @return The dataflow's execution plan, as a JSON string.
+        * @throws Exception Thrown, if the optimization process that creates 
the execution plan failed.
         */
        public static String optimizerPlanAsJSON(Plan plan) throws Exception {
-               LocalExecutor exec = new LocalExecutor();
-               try {
-                       exec.start();
-                       Optimizer pc = new Optimizer(new DataStatistics(), 
exec.flink.configuration());
-                       OptimizedPlan op = pc.compile(plan);
-                       PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
-
-                       return gen.getOptimizerPlanAsJSON(op);
-               } finally {
-                       exec.stop();
-               }
+               final int parallelism = plan.getDefaultParallelism() == -1 ? 1 
: plan.getDefaultParallelism();
+
+               Optimizer pc = new Optimizer(new DataStatistics(), new 
Configuration());
+               pc.setDefaultParallelism(parallelism);
+               OptimizedPlan op = pc.compile(plan);
+
+               return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op);
        }
 
        /**
-        * Return unoptimized plan as JSON.
+        * Creates a JSON representation of the given dataflow plan.
         * 
-        * @param plan The program plan.
-        * @return The plan as a JSON object.
+        * @param plan The dataflow plan.
+        * @return The dataflow plan (prior to optimization) as a JSON string.
         */
        public static String getPlanAsJSON(Plan plan) {
-               PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
                List<DataSinkNode> sinks = 
Optimizer.createPreOptimizedPlan(plan);
-               return gen.getPactPlanAsJSON(sinks);
+               return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 20169f6..e8e9ade 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client;
 
-import java.io.File;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -26,36 +25,41 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} 
that takes the program
  * and ships it to a remote Flink cluster for execution.
  * 
- * The RemoteExecutor is pointed at the JobManager and gets the program and 
(if necessary) the
- * set of libraries that need to be shipped together with the program.
+ * <p>The RemoteExecutor is pointed at the JobManager and gets the program and 
(if necessary) the
+ * set of libraries that need to be shipped together with the program.</p>
  * 
- * The RemoteExecutor is used in the {@link 
org.apache.flink.api.java.RemoteEnvironment} to
- * remotely execute program parts.
+ * <p>The RemoteExecutor is used in the {@link 
org.apache.flink.api.java.RemoteEnvironment} to
+ * remotely execute program parts.</p>
  */
 public class RemoteExecutor extends PlanExecutor {
+               
+       private final Object lock = new Object();
        
-       private static final Logger LOG = 
LoggerFactory.getLogger(RemoteExecutor.class);
-
        private final List<String> jarFiles;
 
        private final Configuration clientConfiguration;
+
+       private Client client;
+       
+       private int defaultParallelism = 1;
+       
        
        public RemoteExecutor(String hostname, int port) {
                this(hostname, port, Collections.<String>emptyList(), new 
Configuration());
@@ -97,51 +101,148 @@ public class RemoteExecutor extends PlanExecutor {
                
clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 
inet.getPort());
        }
 
+       // 
------------------------------------------------------------------------
+       //  Properties
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Sets the parallelism that will be used when neither the program does 
not define
+        * any parallelism at all.
+        *
+        * @param defaultParallelism The default parallelism for the executor.
+        */
+       public void setDefaultParallelism(int defaultParallelism) {
+               if (defaultParallelism < 1) {
+                       throw new IllegalArgumentException("The default 
parallelism must be at least one");
+               }
+               this.defaultParallelism = defaultParallelism;
+       }
+
+       /**
+        * Gets the parallelism that will be used when neither the program does 
not define
+        * any parallelism at all.
+        * 
+        * @return The default parallelism for the executor.
+        */
+       public int getDefaultParallelism() {
+               return defaultParallelism;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Startup & Shutdown
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public void start() throws Exception {
+               synchronized (lock) {
+                       if (client == null) {
+                               client = new Client(clientConfiguration);
+                               
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
+                       }
+                       else {
+                               throw new IllegalStateException("The remote 
executor was already started.");
+                       }
+               }
+       }
+
+       @Override
+       public void stop() throws Exception {
+               synchronized (lock) {
+                       if (client != null) {
+                               client.shutdown();
+                               client = null;
+                       }
+               }
+       }
+
+       @Override
+       public boolean isRunning() {
+               return client != null;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Executing programs
+       // 
------------------------------------------------------------------------
+
        @Override
        public JobExecutionResult executePlan(Plan plan) throws Exception {
+               if (plan == null) {
+                       throw new IllegalArgumentException("The plan may not be 
null.");
+               }
+
                JobWithJars p = new JobWithJars(plan, this.jarFiles);
                return executePlanWithJars(p);
        }
-       
-       public JobExecutionResult executePlanWithJars(JobWithJars p) throws 
Exception {
-               Client c = new Client(clientConfiguration, 
p.getUserCodeClassLoader(), -1);
-               
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-               
-               JobSubmissionResult result = c.run(p, -1, true);
-               if (result instanceof JobExecutionResult) {
-                       return (JobExecutionResult) result;
-               } else {
-                       LOG.warn("The Client didn't return a 
JobExecutionResult");
-                       return new JobExecutionResult(result.getJobID(), -1, 
null);
+
+       public JobExecutionResult executePlanWithJars(JobWithJars program) 
throws Exception {
+               if (program == null) {
+                       throw new IllegalArgumentException("The job may not be 
null.");
                }
-       }
 
-       public JobExecutionResult executeJar(String jarPath, String 
assemblerClass, String... args) throws Exception {
-               File jarFile = new File(jarPath);
-               PackagedProgram program = new PackagedProgram(jarFile, 
assemblerClass, args);
-               
-               Client c = new Client(clientConfiguration, 
program.getUserCodeClassLoader(), -1);
-               
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
-               
-               JobSubmissionResult result = c.run(program.getPlanWithJars(), 
-1, true);
-               if(result instanceof JobExecutionResult) {
-                       return (JobExecutionResult) result;
-               } else {
-                       LOG.warn("The Client didn't return a 
JobExecutionResult");
-                       return new JobExecutionResult(result.getJobID(), -1, 
null);
+               synchronized (this.lock) {
+                       // check if we start a session dedicated for this 
execution
+                       final boolean shutDownAtEnd;
+
+                       if (client == null) {
+                               shutDownAtEnd = true;
+                               // start the executor for us
+                               start();
+                       }
+                       else {
+                               // we use the existing session
+                               shutDownAtEnd = false;
+                       }
+
+                       try {
+                               return client.runBlocking(program, 
defaultParallelism);
+                       }
+                       finally {
+                               if (shutDownAtEnd) {
+                                       stop();
+                               }
+                       }
                }
        }
 
        @Override
        public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-               JobWithJars p = new JobWithJars(plan, this.jarFiles);
-               Client c = new Client(clientConfiguration, 
p.getUserCodeClassLoader(), -1);
-               
-               OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
-               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON(op);
+               Optimizer opt = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new Configuration());
+               OptimizedPlan optPlan = opt.compile(plan);
+               return new 
PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan);
        }
-       
+
+       @Override
+       public void endSession(JobID jobID) throws Exception {
+               if (jobID == null) {
+                       throw new NullPointerException("The supplied jobID must 
not be null.");
+               }
+
+               synchronized (this.lock) {
+                       // check if we start a session dedicated for this 
execution
+                       final boolean shutDownAtEnd;
+
+                       if (client == null) {
+                               shutDownAtEnd = true;
+                               // start the executor for us
+                               start();
+                       }
+                       else {
+                               // we use the existing session
+                               shutDownAtEnd = false;
+                       }
+
+                       try {
+                               client.endSession(jobID);
+                       }
+                       finally {
+                               if (shutDownAtEnd) {
+                                       stop();
+                               }
+                       }
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //   Utilities
        // 
--------------------------------------------------------------------------------------------
@@ -168,5 +269,4 @@ public class RemoteExecutor extends PlanExecutor {
                }
                return new InetSocketAddress(host, port);
        }
-       
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index e7464c8..6c886fe 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -18,10 +18,9 @@
 
 package org.apache.flink.client.program;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -32,8 +31,6 @@ import 
org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -65,7 +62,6 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorSystem;
 
-import com.google.common.base.Preconditions;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote 
cluster.
@@ -78,62 +74,139 @@ public class Client {
         * The configuration to use for the client (optimizer, timeouts, ...) 
and to connect to the
         * JobManager.
         */
-       private final Configuration configuration;
-
        /** The optimizer used in the optimization of batch programs */
-       private final Optimizer compiler;
+       final Optimizer compiler;
+       
+       /** The actor system used to communicate with the JobManager */
+       private final ActorSystem actorSystem;
 
-       /** The class loader to use for classes from the user program (e.g., 
functions and data types) */
-       private final ClassLoader userCodeClassLoader;
+       /** The actor reference to the JobManager */
+       private final ActorGateway jobManagerGateway;
+
+       /** The timeout for communication between the client and the JobManager 
*/
+       private final FiniteDuration timeout;
+       
+       /**
+        * If != -1, this field specifies the total number of available slots 
on the cluster
+        * connected to the client.
+        */
+       private final int maxSlots;
 
        /** Flag indicating whether to sysout print execution updates */
        private boolean printStatusDuringExecution = true;
 
        /**
-        * If != -1, this field specifies the total number of available slots 
on the cluster
-        * connected to the client.
+        *  For interactive invocations, the Job ID is only available after the 
ContextEnvironment has
+        *  been run inside the user JAR. We pass the Client to every instance 
of the ContextEnvironment
+        *  which lets us access the last JobID here.
         */
-       private int maxSlots;
+       private JobID lastJobID;
 
-       /** ID of the last job submitted with this client. */
-       private JobID lastJobId = null;
-       
-       
        // 
------------------------------------------------------------------------
        //                            Construction
        // 
------------------------------------------------------------------------
 
        /**
         * Creates a instance that submits the programs to the JobManager 
defined in the
-        * configuration. It sets the maximum number of slots to unknown (= -1).
+        * configuration. This method will try to resolve the JobManager 
hostname and throw an exception
+        * if that is not possible.
         *
-        * @param config The config used to obtain the JobManager's address.
-        * @param userCodeClassLoader The class loader to use for loading user 
code classes.
+        * @param config The config used to obtain the job-manager's address, 
and used to configure the optimizer.
+        * 
+        * @throws java.io.IOException Thrown, if the client's actor system 
could not be started.
+        * @throws java.net.UnknownHostException Thrown, if the JobManager's 
hostname could not be resolved.
         */
-       public Client(Configuration config, ClassLoader userCodeClassLoader) {
-               this(config, userCodeClassLoader, -1);
+       public Client(Configuration config) throws IOException {
+               this(config, -1);
        }
 
        /**
-        * Creates a instance that submits the programs to the JobManager 
defined in the
-        * configuration.
+        * Creates a new instance of the class that submits the jobs to a 
job-manager.
+        * at the given address using the default port.
         * 
-        * @param config The config used to obtain the JobManager's address.
-        * @param userCodeClassLoader The class loader to use for loading user 
code classes.
-        * @param maxSlots The number of maxSlots on the cluster if != -1
+        * @param config The configuration for the client-side processes, like 
the optimizer.
+        * @param maxSlots maxSlots The number of maxSlots on the cluster if != 
-1.
+        *                    
+        * @throws java.io.IOException Thrown, if the client's actor system 
could not be started.
+        * @throws java.net.UnknownHostException Thrown, if the JobManager's 
hostname could not be resolved.   
         */
-       public Client(Configuration config, ClassLoader userCodeClassLoader, 
int maxSlots) {
-               Preconditions.checkNotNull(config, "Configuration is null");
-               Preconditions.checkNotNull(userCodeClassLoader, "User code 
ClassLoader is null");
-               
-               this.configuration = config;
-               this.userCodeClassLoader = userCodeClassLoader;
+       public Client(Configuration config, int maxSlots) throws IOException {
 
-               this.compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), configuration);
+               this.compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), config);
                this.maxSlots = maxSlots;
+
+               LOG.info("Starting client actor system");
+
+               try {
+                       this.actorSystem = 
JobClient.startJobClientActorSystem(config);
+               } catch (Exception e) {
+                       throw new IOException("Could start client actor 
system.", e);
+               }
+
+               // from here on, we need to make sure the actor system is shut 
down on error
+               boolean success = false;
+
+               try {
+
+                       FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(config);
+                       this.timeout = AkkaUtils.getTimeout(config);
+
+                       LOG.info("Looking up JobManager");
+                       LeaderRetrievalService leaderRetrievalService;
+
+                       try {
+                               leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(config);
+                       } catch (Exception e) {
+                               throw new IOException("Could not create the 
leader retrieval service.", e);
+                       }
+
+                       try {
+                               this.jobManagerGateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
+                                               leaderRetrievalService,
+                                               actorSystem,
+                                               lookupTimeout);
+                       } catch (LeaderRetrievalException e) {
+                               throw new IOException("Failed to retrieve 
JobManager gateway", e);
+                       }
+
+                       LOG.info("Leading JobManager actor system address is " 
+ this.jobManagerGateway.path());
+
+                       LOG.info("JobManager runs at " + 
this.jobManagerGateway.path());
+
+                       LOG.info("Communication between client and JobManager 
will have a timeout of " + this.timeout);
+                       success = true;
+               } finally {
+                       if (!success) {
+                               try {
+                                       this.actorSystem.shutdown();
+
+                                       // wait at most for 30 seconds, to work 
around an occasional akka problem
+                                       actorSystem.awaitTermination(new 
FiniteDuration(30, TimeUnit.SECONDS));
+                               } catch (Throwable t) {
+                                       LOG.error("Shutting down actor system 
after error caused another error", t);
+                               }
+                       }
+               }
        }
+       // 
------------------------------------------------------------------------
+       //  Startup & Shutdown
+       // 
------------------------------------------------------------------------
 
        /**
+        * Shuts down the client. This stops the internal actor system and 
actors.
+        */
+       public void shutdown() {
+               if (!this.actorSystem.isTerminated()) {
+                       this.actorSystem.shutdown();
+                       this.actorSystem.awaitTermination();
+               }
+       }
+       
+       // 
------------------------------------------------------------------------
+       //  Configuration
+       // 
------------------------------------------------------------------------
+       
+       /**
         * Configures whether the client should print progress updates during 
the execution to {@code System.out}.
         * All updates are logged via the SLF4J loggers regardless of this 
setting.
         * 
@@ -159,118 +232,84 @@ public class Client {
        }
        
        // 
------------------------------------------------------------------------
-       //                      Compilation and Submission
+       //  Access to the Program's Plan
        // 
------------------------------------------------------------------------
        
-       public String getOptimizedPlanAsJson(PackagedProgram prog, int 
parallelism) throws CompilerException, ProgramInvocationException {
+       public static String getOptimizedPlanAsJson(Optimizer compiler, 
PackagedProgram prog, int parallelism)
+                       throws CompilerException, ProgramInvocationException
+       {
                PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(prog, parallelism));
+               return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(compiler, prog, parallelism));
        }
-       
-       public FlinkPlan getOptimizedPlan(PackagedProgram prog, int 
parallelism) throws CompilerException, ProgramInvocationException {
+
+       public static FlinkPlan getOptimizedPlan(Optimizer compiler, 
PackagedProgram prog, int parallelism)
+                       throws CompilerException, ProgramInvocationException
+       {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
-                       return getOptimizedPlan(prog.getPlanWithJars(), 
parallelism);
-               }
-               else if (prog.isUsingInteractiveMode()) {
+                       return getOptimizedPlan(compiler, 
prog.getPlanWithJars(), parallelism);
+               } else if (prog.isUsingInteractiveMode()) {
                        // temporary hack to support the optimizer plan preview
-                       OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(this.compiler);
+                       OptimizerPlanEnvironment env = new 
OptimizerPlanEnvironment(compiler);
                        if (parallelism > 0) {
                                env.setParallelism(parallelism);
                        }
-                       env.setAsContext();
-                       
-                       // temporarily write syserr and sysout to a byte array.
-                       PrintStream originalOut = System.out;
-                       PrintStream originalErr = System.err;
-                       ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
-                       System.setOut(new PrintStream(baos));
-                       ByteArrayOutputStream baes = new 
ByteArrayOutputStream();
-                       System.setErr(new PrintStream(baes));
-                       try {
-                               ContextEnvironment.enableLocalExecution(false);
-                               prog.invokeInteractiveModeForExecution();
-                       }
-                       catch (ProgramInvocationException e) {
-                               throw e;
-                       }
-                       catch (Throwable t) {
-                               // the invocation gets aborted with the preview 
plan
-                               if (env.optimizerPlan != null) {
-                                       return env.optimizerPlan;
-                               } else {
-                                       throw new 
ProgramInvocationException("The program caused an error: ", t);
-                               }
-                       }
-                       finally {
-                               ContextEnvironment.enableLocalExecution(true);
-                               System.setOut(originalOut);
-                               System.setErr(originalErr);
-                               System.err.println(baes);
-                               System.out.println(baos);
-                       }
-                       
-                       throw new ProgramInvocationException(
-                                       "The program plan could not be fetched 
- the program aborted pre-maturely.\n"
-                                       + "System.err: " + baes.toString() + 
'\n'
-                                       + "System.out: " + baos.toString() + 
'\n');
-               }
-               else {
-                       throw new RuntimeException();
+
+                       return env.getOptimizedPlan(prog);
+               } else {
+                       throw new RuntimeException("Couldn't determine program 
mode.");
                }
        }
-       
-       public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws 
CompilerException {
+
+       public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan 
p, int parallelism) throws CompilerException {
                if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-                       LOG.debug("Changing plan default parallelism from {} to 
{}",p.getDefaultParallelism(), parallelism);
+                       LOG.debug("Changing plan default parallelism from {} to 
{}", p.getDefaultParallelism(), parallelism);
                        p.setDefaultParallelism(parallelism);
                }
                LOG.debug("Set parallelism {}, plan default parallelism {}", 
parallelism, p.getDefaultParallelism());
 
-               return this.compiler.compile(p);
-       }
-       
-       
-       /**
-        * Creates the optimized plan for a given program, using this client's 
compiler.
-        *  
-        * @param prog The program to be compiled.
-        * @return The compiled and optimized plan, as returned by the compiler.
-        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
-        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file.
-        */
-       public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) 
throws CompilerException, ProgramInvocationException {
-               return getOptimizedPlan(prog.getPlan(), parallelism);
-       }
-       
-       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) 
throws ProgramInvocationException {
-               return getJobGraph(optPlan, prog.getAllLibraries());
+               return compiler.compile(p);
        }
-       
-       private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
-               JobGraph job;
-               if (optPlan instanceof StreamingPlan) {
-                       job = ((StreamingPlan) optPlan).getJobGraph();
-               } else {
-                       JobGraphGenerator gen = new 
JobGraphGenerator(this.configuration);
-                       job = gen.compileJobGraph((OptimizedPlan) optPlan);
-               }
 
-               for (File jar : jarFiles) {
-                       job.addJar(new Path(jar.getAbsolutePath()));
+       // 
------------------------------------------------------------------------
+       //  Program submission / execution
+       // 
------------------------------------------------------------------------
+
+       public JobExecutionResult runBlocking(PackagedProgram prog, int 
parallelism) throws ProgramInvocationException {
+               
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
+               if (prog.isUsingProgramEntryPoint()) {
+                       return runBlocking(prog.getPlanWithJars(), parallelism);
                }
+               else if (prog.isUsingInteractiveMode()) {
+                       LOG.info("Starting program in interactive mode");
+                       ContextEnvironment.setAsContext(this, 
prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true);
+                       ContextEnvironment.enableLocalExecution(false);
 
-               return job;
+                       // invoke here
+                       try {
+                               prog.invokeInteractiveModeForExecution();
+                       }
+                       finally {
+                               ContextEnvironment.enableLocalExecution(true);
+                       }
+
+                       return JobExecutionResult.fromJobSubmissionResult(new 
JobSubmissionResult(lastJobID));
+               }
+               else {
+                       throw new RuntimeException();
+               }
        }
 
-       public JobSubmissionResult run(final PackagedProgram prog, int 
parallelism, boolean wait) throws ProgramInvocationException {
+       public JobSubmissionResult runDetached(PackagedProgram prog, int 
parallelism)
+                       throws ProgramInvocationException
+       {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
-                       return run(prog.getPlanWithJars(), parallelism, wait);
+                       return runDetached(prog.getPlanWithJars(), parallelism);
                }
                else if (prog.isUsingInteractiveMode()) {
                        LOG.info("Starting program in interactive mode");
-                       ContextEnvironment.setAsContext(this, 
prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
+                       ContextEnvironment.setAsContext(this, 
prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false);
                        ContextEnvironment.enableLocalExecution(false);
 
                        // invoke here
@@ -281,113 +320,108 @@ public class Client {
                                ContextEnvironment.enableLocalExecution(true);
                        }
 
-                       // Job id has been set in the Client passed to the 
ContextEnvironment
-                       return new JobSubmissionResult(lastJobId);
+                       return new JobSubmissionResult(lastJobID);
                }
                else {
-                       throw new RuntimeException();
+                       throw new RuntimeException("PackagedProgram does not 
have a valid invocation mode.");
                }
        }
-       
-       public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan 
optimizedPlan, boolean wait) throws ProgramInvocationException {
-               return run(optimizedPlan, prog.getAllLibraries(), wait);
 
-       }
-       
        /**
-        * Runs a program on Flink cluster whose job-manager is configured in 
this client's configuration.
-        * This method involves all steps, from compiling, job-graph generation 
to submission.
-        * 
-        * @param prog The program to be executed.
+        * Runs a program on the Flink cluster to which this client is 
connected. The call blocks until the
+        * execution is complete, and returns afterwards.
+        *
+        * @param program The program to be executed.
         * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
         *                    when the program does not set a parallelism by 
itself.
-        * @param wait A flag that indicates whether this function call should 
block until the program execution is done.
+        *
         * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
         * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file,
         *                                    or if the submission failed. That 
might be either due to an I/O problem,
         *                                    i.e. the job-manager is 
unreachable, or due to the fact that the
         *                                    parallel execution failed.
         */
-       public JobSubmissionResult run(JobWithJars prog, int parallelism, 
boolean wait) throws CompilerException, ProgramInvocationException {
-               return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), 
prog.getJarFiles(), wait);
+       public JobExecutionResult runBlocking(JobWithJars program, int 
parallelism) 
+                       throws CompilerException, ProgramInvocationException
+       {
+               ClassLoader classLoader = program.getUserCodeClassLoader();
+               if (classLoader == null) {
+                       throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
+               }
+
+               OptimizedPlan optPlan = getOptimizedPlan(compiler, program, 
parallelism);
+               return runBlocking(optPlan, program.getJarFiles(), classLoader);
+       }
+
+       /**
+        * Submits a program to the Flink cluster to which this client is 
connected. The call returns after the
+        * program was submitted and does not wait for the program to complete.
+        *
+        * @param program The program to be executed.
+        * @param parallelism The default parallelism to use when running the 
program. The default parallelism is used
+        *                    when the program does not set a parallelism by 
itself.
+        *
+        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
+        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file,
+        *                                    or if the submission failed. That 
might be either due to an I/O problem,
+        *                                    i.e. the job-manager is 
unreachable.
+        */
+       public JobSubmissionResult runDetached(JobWithJars program, int 
parallelism)
+                       throws CompilerException, ProgramInvocationException
+       {
+               ClassLoader classLoader = program.getUserCodeClassLoader();
+               if (classLoader == null) {
+                       throw new IllegalArgumentException("The given 
JobWithJars does not provide a usercode class loader.");
+               }
+
+               OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, 
program, parallelism);
+               return runDetached(optimizedPlan, program.getJarFiles(), 
classLoader);
        }
        
 
-       public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> 
libraries, boolean wait) throws ProgramInvocationException {
+       public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, 
List<File> libraries, ClassLoader classLoader)
+                       throws ProgramInvocationException
+       {
                JobGraph job = getJobGraph(compiledPlan, libraries);
-               this.lastJobId = job.getJobID();
-               return run(job, wait);
+               return runBlocking(job, classLoader);
        }
 
-       public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws 
ProgramInvocationException {
-               this.lastJobId = jobGraph.getJobID();
-               
-               LOG.info("Starting client actor system");
-               final ActorSystem actorSystem;
+       public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, 
List<File> libraries, ClassLoader classLoader)
+                       throws ProgramInvocationException
+       {
+               JobGraph job = getJobGraph(compiledPlan, libraries);
+               return runDetached(job, classLoader);
+       }
+
+       public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               LOG.info("Checking and uploading JAR files");
                try {
-                       actorSystem = 
JobClient.startJobClientActorSystem(configuration);
+                       JobClient.uploadJarFiles(jobGraph, jobManagerGateway, 
timeout);
+               } catch (IOException e) {
+                       throw new ProgramInvocationException("Could not upload 
the program's JAR files to the JobManager.", e);
                }
-               catch (Exception e) {
-                       throw new ProgramInvocationException("Could start 
client actor system.", e);
+               try {
+                       this.lastJobID = jobGraph.getJobID();
+                       return JobClient.submitJobAndWait(actorSystem, 
jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader);
+               } catch (JobExecutionException e) {
+                       throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
                }
+       }
 
+       public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+               LOG.info("Checking and uploading JAR files");
                try {
-                       FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
-                       FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
-
-                       LOG.info("Looking up JobManager");
-                       ActorGateway jobManagerGateway;
-
-                       LeaderRetrievalService leaderRetrievalService;
-
-                       try {
-                               leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-                       } catch (Exception e) {
-                               throw new ProgramInvocationException("Could not 
create the leader retrieval service.", e);
-                       }
-
-                       try {
-                               jobManagerGateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
-                                               leaderRetrievalService,
-                                               actorSystem,
-                                               lookupTimeout);
-                       } catch (LeaderRetrievalException e) {
-                               throw new ProgramInvocationException("Failed to 
retrieve JobManager gateway", e);
-                       }
-
-                       LOG.info("Leading JobManager actor system address is " 
+ jobManagerGateway.path());
-
-                       LOG.info("JobManager runs at " + 
jobManagerGateway.path());
-
-                       LOG.info("Communication between client and JobManager 
will have a timeout of " + timeout);
-
-                       LOG.info("Checking and uploading JAR files");
-                       try {
-                               JobClient.uploadJarFiles(jobGraph, 
jobManagerGateway, timeout);
-                       } catch (IOException e) {
-                               throw new ProgramInvocationException("Could not 
upload the program's JAR files to the JobManager.", e);
-                       }
-
-                       try {
-                               if (wait) {
-                                       return 
JobClient.submitJobAndWait(actorSystem,
-                                               jobManagerGateway, jobGraph, 
timeout, printStatusDuringExecution, userCodeClassLoader);
-                               } else {
-                                       
JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, 
userCodeClassLoader);
-                                       // return a dummy execution result with 
the JobId
-                                       return new 
JobSubmissionResult(jobGraph.getJobID());
-                               }
-                       } catch (JobExecutionException e) {
+                       JobClient.uploadJarFiles(jobGraph, jobManagerGateway, 
timeout);
+               }
+               catch (IOException e) {
+                       throw new ProgramInvocationException("Could not upload 
the program's JAR files to the JobManager.", e);
+               }
+               try {
+                       this.lastJobID = jobGraph.getJobID();
+                       JobClient.submitJobDetached(jobManagerGateway, 
jobGraph, timeout, classLoader);
+                       return new JobSubmissionResult(jobGraph.getJobID());
+               } catch (JobExecutionException e) {
                                throw new ProgramInvocationException("The 
program execution failed: " + e.getMessage(), e);
-                       } catch (Exception e) {
-                               throw new ProgramInvocationException("Exception 
during program execution.", e);
-                       }
-               } finally {
-                       // shut down started actor system
-                       actorSystem.shutdown();
-                       
-                       // wait at most for 30 seconds, to work around an 
occasional akka problem
-                       actorSystem.awaitTermination(new FiniteDuration(30, 
TimeUnit.SECONDS));
                }
        }
 
@@ -397,62 +431,26 @@ public class Client {
         * @throws Exception In case an error occurred.
         */
        public void cancel(JobID jobId) throws Exception {
-               final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
-               final FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
-
-               ActorSystem actorSystem;
+               Future<Object> response;
                try {
-                       actorSystem = 
JobClient.startJobClientActorSystem(configuration);
+                       response = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout);
                } catch (Exception e) {
-                       throw new ProgramInvocationException("Could start 
client actor system.", e);
+                       throw new ProgramInvocationException("Failed to query 
the job manager gateway.", e);
                }
 
-               try {
-                       ActorGateway jobManagerGateway;
-
-                       LeaderRetrievalService leaderRetrievalService;
+               Object result = Await.result(response, timeout);
 
-                       try {
-                               leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-                       } catch (Exception e) {
-                               throw new ProgramInvocationException("Could not 
create the leader retrieval service.", e);
-                       }
-
-                       try {
-                               jobManagerGateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
-                                               leaderRetrievalService,
-                                               actorSystem,
-                                               lookupTimeout);
-                       } catch (LeaderRetrievalException e) {
-                               throw new ProgramInvocationException("Failed to 
retrieve JobManager gateway", e);
-                       }
-
-                       Future<Object> response;
-                       try {
-                               response = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout);
-                       } catch (Exception e) {
-                               throw new ProgramInvocationException("Failed to 
query the job manager gateway.", e);
-                       }
-                       
-                       Object result = Await.result(response, timeout);
-
-                       if (result instanceof 
JobManagerMessages.CancellationSuccess) {
-                               LOG.debug("Job cancellation with ID " + jobId + 
" succeeded.");
-                       } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
-                               Throwable t = 
((JobManagerMessages.CancellationFailure) result).cause();
-                               LOG.debug("Job cancellation with ID " + jobId + 
" failed.", t);
-                               throw new Exception("Failed to cancel the job 
because of \n" + t.getMessage());
-                       } else {
-                               throw new Exception("Unknown message received 
while cancelling.");
-                       }
-               } finally {
-                       // shut down started actor system
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+               if (result instanceof JobManagerMessages.CancellationSuccess) {
+                       LOG.debug("Job cancellation with ID " + jobId + " 
succeeded.");
+               } else if (result instanceof 
JobManagerMessages.CancellationFailure) {
+                       Throwable t = ((JobManagerMessages.CancellationFailure) 
result).cause();
+                       LOG.debug("Job cancellation with ID " + jobId + " 
failed.", t);
+                       throw new Exception("Failed to cancel the job because 
of \n" + t.getMessage());
+               } else {
+                       throw new Exception("Unknown message received while 
cancelling.");
                }
        }
 
-
        /**
         * Requests and returns the accumulators for the given job identifier. 
Accumulators can be
         * requested while a is running or after it has finished. The default 
class loader is used
@@ -473,117 +471,98 @@ public class Client {
         */
        public Map<String, Object> getAccumulators(JobID jobID, ClassLoader 
loader) throws Exception {
 
-               final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
-               final FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
-
-               ActorSystem actorSystem;
+               Future<Object> response;
                try {
-                       actorSystem = 
JobClient.startJobClientActorSystem(configuration);
+                       response = jobManagerGateway.ask(new 
RequestAccumulatorResults(jobID), timeout);
                } catch (Exception e) {
-                       throw new Exception("Could start client actor system.", 
e);
+                       throw new Exception("Failed to query the job manager 
gateway for accumulators.", e);
                }
 
-               try {
-                       ActorGateway jobManagerGateway;
-
-                       LeaderRetrievalService leaderRetrievalService;
-
-                       try {
-                               leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
-                       } catch (Exception e) {
-                               throw new ProgramInvocationException("Could not 
create the leader retrieval service.", e);
-                       }
-
-                       try {
-                               jobManagerGateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
-                                               leaderRetrievalService,
-                                               actorSystem,
-                                               lookupTimeout);
-                       } catch (LeaderRetrievalException e) {
-                               throw new ProgramInvocationException("Failed to 
retrieve JobManager gateway", e);
-                       }
-
-                       Future<Object> response;
-                       try {
-                               response = jobManagerGateway.ask(new 
RequestAccumulatorResults(jobID), timeout);
-                       } catch (Exception e) {
-                               throw new Exception("Failed to query the job 
manager gateway for accumulators.", e);
-                       }
-
-                       Object result = Await.result(response, timeout);
+               Object result = Await.result(response, timeout);
 
-                       if (result instanceof AccumulatorResultsFound) {
-                               Map<String, SerializedValue<Object>> 
serializedAccumulators =
-                                               ((AccumulatorResultsFound) 
result).result();
+               if (result instanceof AccumulatorResultsFound) {
+                       Map<String, SerializedValue<Object>> 
serializedAccumulators =
+                                       ((AccumulatorResultsFound) 
result).result();
 
-                               return 
AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+                       return 
AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
 
-                       } else if (result instanceof 
AccumulatorResultsErroneous) {
-                               throw ((AccumulatorResultsErroneous) 
result).cause();
-                       } else {
-                               throw new Exception("Failed to fetch 
accumulators for the job " + jobID + ".");
-                       }
-               } finally {
-                       actorSystem.shutdown();
-                       actorSystem.awaitTermination();
+               } else if (result instanceof AccumulatorResultsErroneous) {
+                       throw ((AccumulatorResultsErroneous) result).cause();
+               } else {
+                       throw new Exception("Failed to fetch accumulators for 
the job " + jobID + ".");
                }
        }
 
 
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  Sessions
+       // 
------------------------------------------------------------------------
        
-       public static final class OptimizerPlanEnvironment extends 
ExecutionEnvironment {
-               
-               private final Optimizer compiler;
-               
-               private FlinkPlan optimizerPlan;
-               
-               
-               private OptimizerPlanEnvironment(Optimizer compiler) {
-                       this.compiler = compiler;
-               }
-               
-               @Override
-               public JobExecutionResult execute(String jobName) throws 
Exception {
-                       Plan plan = createProgramPlan(jobName);
-                       this.optimizerPlan = compiler.compile(plan);
-                       
-                       // do not go on with anything now!
-                       throw new ProgramAbortException();
+       /**
+        * Tells the JobManager to finish the session (job) defined by the 
given ID.
+        * 
+        * @param jobId The ID that identifies the session.
+        */
+       public void endSession(JobID jobId) throws Exception {
+               if (jobId == null) {
+                       throw new IllegalArgumentException("The JobID must not 
be null.");
                }
+               endSessions(Collections.singletonList(jobId));
+       }
 
-               @Override
-               public String getExecutionPlan() throws Exception {
-                       Plan plan = createProgramPlan(null, false);
-                       this.optimizerPlan = compiler.compile(plan);
-                       
-                       // do not go on with anything now!
-                       throw new ProgramAbortException();
-               }
-               
-               private void setAsContext() {
-                       ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                               
-                               @Override
-                               public ExecutionEnvironment 
createExecutionEnvironment() {
-                                       return OptimizerPlanEnvironment.this;
-                               }
-                       };
-                       initializeContextEnvironment(factory);
+       /**
+        * Tells the JobManager to finish the sessions (jobs) defined by the 
given IDs.
+        *
+        * @param jobIds The IDs that identify the sessions.
+        */
+       public void endSessions(List<JobID> jobIds) throws Exception {
+               if (jobIds == null) {
+                       throw new IllegalArgumentException("The JobIDs must not 
be null");
                }
                
-               public void setPlan(FlinkPlan plan){
-                       this.optimizerPlan = plan;
+               for (JobID jid : jobIds) {
+                       if (jid != null) {
+                               LOG.info("Telling job manager to end the 
session {}.", jid);
+                               jobManagerGateway.tell(new 
JobManagerMessages.RemoveCachedJob(jid));
+                       }
                }
        }
 
-       // 
--------------------------------------------------------------------------------------------
+       // 
------------------------------------------------------------------------
+       //  Internal translation methods
+       // 
------------------------------------------------------------------------
 
        /**
-        * A special exception used to abort programs when the caller is only 
interested in the
-        * program plan, rather than in the full execution.
+        * Creates the optimized plan for a given program, using this client's 
compiler.
+        *
+        * @param prog The program to be compiled.
+        * @return The compiled and optimized plan, as returned by the compiler.
+        * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
+        * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file.
         */
-       public static final class ProgramAbortException extends Error {
-               private static final long serialVersionUID = 1L;
+       private static OptimizedPlan getOptimizedPlan(Optimizer compiler, 
JobWithJars prog, int parallelism) throws CompilerException,
+                                                                               
                                                                                
        ProgramInvocationException {
+               return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
        }
+
+       public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan 
optPlan) throws ProgramInvocationException {
+               return getJobGraph(optPlan, prog.getAllLibraries());
+       }
+
+       private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> 
jarFiles) {
+               JobGraph job;
+               if (optPlan instanceof StreamingPlan) {
+                       job = ((StreamingPlan) optPlan).getJobGraph();
+               } else {
+                       JobGraphGenerator gen = new JobGraphGenerator();
+                       job = gen.compileJobGraph((OptimizedPlan) optPlan);
+               }
+
+               for (File jar : jarFiles) {
+                       job.addJar(new Path(jar.getAbsolutePath()));
+               }
+
+               return job;
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9287017..ad14a06 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -39,15 +40,14 @@ public class ContextEnvironment extends 
ExecutionEnvironment {
        private static final Logger LOG = 
LoggerFactory.getLogger(ContextEnvironment.class);
 
        private final Client client;
-       
+
        private final List<File> jarFilesToAttach;
-       
+
        private final ClassLoader userCodeClassLoader;
 
        private final boolean wait;
-       
-       
-       
+
+
        public ContextEnvironment(Client remoteConnection, List<File> jarFiles, 
ClassLoader userCodeClassLoader, boolean wait) {
                this.client = remoteConnection;
                this.jarFilesToAttach = jarFiles;
@@ -60,27 +60,33 @@ public class ContextEnvironment extends 
ExecutionEnvironment {
                Plan p = createProgramPlan(jobName);
                JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, 
this.userCodeClassLoader);
 
-               JobSubmissionResult result = this.client.run(toRun, 
getParallelism(), wait);
-               if(result instanceof JobExecutionResult) {
-                       this.lastJobExecutionResult = (JobExecutionResult) 
result;
-                       return (JobExecutionResult) result;
-               } else {
-                       LOG.warn("The Client didn't return a 
JobExecutionResult");
-                       this.lastJobExecutionResult = new 
JobExecutionResult(result.getJobID(), -1, null);
+               if (wait) {
+                       this.lastJobExecutionResult = client.runBlocking(toRun, 
getParallelism());
+                       return this.lastJobExecutionResult;
+               }
+               else {
+                       JobSubmissionResult result = client.runDetached(toRun, 
getParallelism());
+                       LOG.warn("Job was executed in detached mode, the 
results will be available on completion.");
+                       this.lastJobExecutionResult = 
JobExecutionResult.fromJobSubmissionResult(result);
                        return this.lastJobExecutionResult;
                }
        }
 
        @Override
        public String getExecutionPlan() throws Exception {
-               Plan p = createProgramPlan("unnamed job");
-               
-               OptimizedPlan op = (OptimizedPlan) 
this.client.getOptimizedPlan(p, getParallelism());
+               Plan plan = createProgramPlan("unnamed job");
 
+               OptimizedPlan op = Client.getOptimizedPlan(client.compiler, 
plan, getParallelism());
                PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
                return gen.getOptimizerPlanAsJSON(op);
        }
 
+       @Override
+       public void startNewSession() throws Exception {
+               client.endSession(jobID);
+               jobID = JobID.generate();
+       }
+
        public boolean isWait() {
                return wait;
        }
@@ -104,7 +110,9 @@ public class ContextEnvironment extends 
ExecutionEnvironment {
        static void setAsContext(Client client, List<File> jarFilesToAttach, 
                                ClassLoader userCodeClassLoader, int 
defaultParallelism, boolean wait)
        {
-               initializeContextEnvironment(new 
ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, 
defaultParallelism, wait));
+               ContextEnvironmentFactory factory =
+                               new ContextEnvironmentFactory(client, 
jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait);
+               initializeContextEnvironment(factory);
        }
        
        protected static void enableLocalExecution(boolean enabled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
index b86487f..9e84e2d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client.program;
 
 import java.io.File;
@@ -30,6 +29,10 @@ import java.util.List;
 
 import org.apache.flink.api.common.Plan;
 
+/**
+ * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files 
that contain
+ * the classes of the functions and libraries necessary for the execution.
+ */
 public class JobWithJars {
        
        private Plan plan;

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
new file mode 100644
index 0000000..c9c3b45
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class OptimizerPlanEnvironment extends ExecutionEnvironment {
+
+       private final Optimizer compiler;
+
+       private FlinkPlan optimizerPlan;
+
+       public OptimizerPlanEnvironment(Optimizer compiler) {
+               this.compiler = compiler;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Execution Environment methods
+       // 
------------------------------------------------------------------------
+       
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               Plan plan = createProgramPlan(jobName);
+               this.optimizerPlan = compiler.compile(plan);
+
+               // do not go on with anything now!
+               throw new ProgramAbortException();
+       }
+
+       @Override
+       public String getExecutionPlan() throws Exception {
+               Plan plan = createProgramPlan(null, false);
+               this.optimizerPlan = compiler.compile(plan);
+
+               // do not go on with anything now!
+               throw new ProgramAbortException();
+       }
+
+       @Override
+       public void startNewSession() {
+               // do nothing
+       }
+
+       public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws 
ProgramInvocationException {
+               setAsContext();
+
+               // temporarily write syserr and sysout to a byte array.
+               PrintStream originalOut = System.out;
+               PrintStream originalErr = System.err;
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               System.setOut(new PrintStream(baos));
+               ByteArrayOutputStream baes = new ByteArrayOutputStream();
+               System.setErr(new PrintStream(baes));
+               try {
+                       ContextEnvironment.enableLocalExecution(false);
+                       prog.invokeInteractiveModeForExecution();
+               }
+               catch (ProgramInvocationException e) {
+                       throw e;
+               }
+               catch (Throwable t) {
+                       // the invocation gets aborted with the preview plan
+                       if (optimizerPlan != null) {
+                               return optimizerPlan;
+                       } else {
+                               throw new ProgramInvocationException("The 
program caused an error: ", t);
+                       }
+               }
+               finally {
+                       ContextEnvironment.enableLocalExecution(true);
+                       System.setOut(originalOut);
+                       System.setErr(originalErr);
+                       System.err.println(baes);
+                       System.out.println(baos);
+               }
+
+               throw new ProgramInvocationException(
+                               "The program plan could not be fetched - the 
program aborted pre-maturely.\n"
+                                               + "System.err: " + 
baes.toString() + '\n'
+                                               + "System.out: " + 
baos.toString() + '\n');
+       }
+       // 
------------------------------------------------------------------------
+
+       private void setAsContext() {
+               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
+
+                       @Override
+                       public ExecutionEnvironment 
createExecutionEnvironment() {
+                               return OptimizerPlanEnvironment.this;
+                       }
+               };
+               initializeContextEnvironment(factory);
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       public void setPlan(FlinkPlan plan){
+               this.optimizerPlan = plan;
+       }
+
+       /**
+        * A special exception used to abort programs when the caller is only 
interested in the
+        * program plan, rather than in the full execution.
+        */
+       public static final class ProgramAbortException extends Error {
+               private static final long serialVersionUID = 1L;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 10096da..091a959 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -40,12 +40,9 @@ import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
@@ -166,7 +163,7 @@ public class PackagedProgram {
                }
        }
        
-       public PackagedProgram(Class<?> entryPointClass, String... args) throws 
ProgramInvocationException {
+       PackagedProgram(Class<?> entryPointClass, String... args) throws 
ProgramInvocationException {
                this.jarFile = null;
                this.args = args == null ? new String[0] : args;
                
@@ -685,51 +682,5 @@ public class PackagedProgram {
                        throw new ProgramInvocationException("Cannot access jar 
file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
                }
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       public static final class PreviewPlanEnvironment extends 
ExecutionEnvironment {
-
-               private List<DataSinkNode> previewPlan;
-               private Plan plan;
-               
-               private String preview = null;
-               
-               @Override
-               public JobExecutionResult execute(String jobName) throws 
Exception {
-                       this.plan = createProgramPlan(jobName);
-                       this.previewPlan = 
Optimizer.createPreOptimizedPlan((Plan) plan);
-                       
-                       // do not go on with anything now!
-                       throw new Client.ProgramAbortException();
-               }
 
-               @Override
-               public String getExecutionPlan() throws Exception {
-                       Plan plan = createProgramPlan("unused");
-                       this.previewPlan = 
Optimizer.createPreOptimizedPlan(plan);
-                       
-                       // do not go on with anything now!
-                       throw new Client.ProgramAbortException();
-               }
-               
-               public void setAsContext() {
-                       ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                               @Override
-                               public ExecutionEnvironment 
createExecutionEnvironment() {
-                                       return PreviewPlanEnvironment.this;
-                               }
-                       };
-                       initializeContextEnvironment(factory);
-               }
-
-               public Plan getPlan() {
-                       return this.plan;
-               }
-               
-               public void setPreview(String preview) {
-                       this.preview = preview;
-               }
-
-       }
 }

Reply via email to