[FLINK-2097][core] implement a job session management Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it around for further operations to be attached to the graph. That is the basis for interactive sessions.
This pull request implements a rudimentary session management. Together with the backtracking #640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended The following changes have also been made in this pull request: - The Job ID is created through the ExecutionEnvironment and passed through - Sessions can be termined by the ExecutionEnvironment or directly through the executor - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination when the environment runs out of scope - The Client manages only connections to the JobManager, it is not job specific This closes #858. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/71bf2f57 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/71bf2f57 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/71bf2f57 Branch: refs/heads/master Commit: 71bf2f570861daae53b24bfcf1d06aedb85311b9 Parents: 7984acc Author: Maximilian Michels <m...@apache.org> Authored: Fri Sep 4 17:34:44 2015 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Tue Sep 22 19:55:46 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 182 +++--- .../org/apache/flink/client/LocalExecutor.java | 222 ++++--- .../org/apache/flink/client/RemoteExecutor.java | 188 ++++-- .../org/apache/flink/client/program/Client.java | 653 +++++++++---------- .../client/program/ContextEnvironment.java | 40 +- .../flink/client/program/JobWithJars.java | 5 +- .../program/OptimizerPlanEnvironment.java | 132 ++++ .../flink/client/program/PackagedProgram.java | 51 +- .../client/program/PreviewPlanEnvironment.java | 80 +++ .../flink/client/web/JobSubmissionServlet.java | 30 +- .../flink/client/CliFrontendInfoTest.java | 81 +-- .../client/CliFrontendPackageProgramTest.java | 10 +- .../apache/flink/client/CliFrontendRunTest.java | 30 +- .../RemoteExecutorHostnameResolutionTest.java | 6 +- .../client/program/ClientConnectionTest.java | 8 +- .../apache/flink/client/program/ClientTest.java | 142 ++-- .../ExecutionPlanAfterExecutionTest.java | 15 +- .../program/ExecutionPlanCreationTest.java | 9 +- .../client/program/PackagedProgramTest.java | 1 - .../stormcompatibility/api/FlinkClient.java | 22 +- .../flink/api/common/JobExecutionResult.java | 17 +- .../java/org/apache/flink/api/common/JobID.java | 46 +- .../flink/api/common/JobSubmissionResult.java | 5 +- .../java/org/apache/flink/api/common/Plan.java | 71 +- .../apache/flink/api/common/PlanExecutor.java | 85 ++- .../flink/api/java/CollectionEnvironment.java | 4 + .../flink/api/java/ExecutionEnvironment.java | 97 ++- .../apache/flink/api/java/LocalEnvironment.java | 180 ++++- .../flink/api/java/RemoteEnvironment.java | 153 ++++- .../flink/optimizer/plan/OptimizedPlan.java | 15 +- .../plantranslate/JobGraphGenerator.java | 24 +- .../optimizer/postpass/JavaApiPostPass.java | 2 +- .../apache/flink/runtime/client/JobClient.java | 11 +- .../runtime/client/JobExecutionException.java | 2 +- .../runtime/executiongraph/ExecutionGraph.java | 8 + .../apache/flink/runtime/jobgraph/JobGraph.java | 58 +- .../runtime/taskmanager/TaskExecutionState.java | 2 +- .../flink/runtime/jobmanager/JobInfo.scala | 18 +- .../flink/runtime/jobmanager/JobManager.scala | 98 ++- .../runtime/jobmanager/MemoryArchivist.scala | 11 +- .../runtime/messages/JobManagerMessages.scala | 6 + .../runtime/minicluster/FlinkMiniCluster.scala | 9 +- .../PartialConsumePipelinedResultTest.java | 3 +- .../TaskManagerProcessReapingTest.java | 2 +- .../runtime/jobmanager/JobManagerITCase.scala | 126 +++- .../flink/api/scala/ExecutionEnvironment.scala | 31 +- .../api/avro/AvroExternalJarProgramITCase.java | 38 +- .../environment/RemoteStreamEnvironment.java | 24 +- .../environment/StreamContextEnvironment.java | 28 +- .../environment/StreamExecutionEnvironment.java | 11 +- .../api/environment/StreamPlanEnvironment.java | 7 +- .../flink/tez/client/LocalTezEnvironment.java | 5 + .../flink/tez/client/RemoteTezEnvironment.java | 5 + .../apache/flink/tez/client/TezExecutor.java | 21 + .../apache/flink/test/util/TestEnvironment.java | 4 + .../clients/examples/LocalExecutorITCase.java | 3 +- .../RemoteEnvironmentITCase.java | 2 +- .../jsonplan/DumpCompiledPlanTest.java | 6 +- .../jsonplan/JsonJobGraphGenerationTest.java | 4 + .../jobmanager/JobManagerFailsITCase.scala | 1 - .../org/apache/flink/yarn/YarnTestBase.java | 2 +- .../org/apache/flink/yarn/FlinkYarnCluster.java | 2 + 62 files changed, 2114 insertions(+), 1040 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index fc4d98a..f0e6c4f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -57,10 +57,13 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.FlinkPlan; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -297,44 +300,51 @@ public class CliFrontend { int userParallelism = options.getParallelism(); LOG.debug("User parallelism is set to {}", userParallelism); - Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism); + Client client = getClient(options, program.getMainClassName(), userParallelism); client.setPrintStatusDuringExecution(options.getStdoutLogging()); LOG.debug("Client slots is set to {}", client.getMaxSlots()); - if(client.getMaxSlots() != -1 && userParallelism == -1) { - logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " + - "To use another parallelism, set it at the ./bin/flink client."); - userParallelism = client.getMaxSlots(); - } - // check if detached per job yarn cluster is used to start flink - if(yarnCluster != null && yarnCluster.isDetached()) { - logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " + - "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + - "yarn application -kill " + yarnCluster.getApplicationId() + "\n" + - "Please also note that the temporary files of the YARN session in the home directoy will not be removed."); - exitCode = executeProgram(program, client, userParallelism, false); - } else { - // regular (blocking) execution. - exitCode = executeProgram(program, client, userParallelism, true); - } + try { + if (client.getMaxSlots() != -1 && userParallelism == -1) { + logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " + + "To use another parallelism, set it at the ./bin/flink client."); + userParallelism = client.getMaxSlots(); + } - // show YARN cluster status if its not a detached YARN cluster. - if (yarnCluster != null && !yarnCluster.isDetached()) { - List<String> msgs = yarnCluster.getNewMessages(); - if (msgs != null && msgs.size() > 1) { + // check if detached per job yarn cluster is used to start flink + if (yarnCluster != null && yarnCluster.isDetached()) { + logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " + + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + + "yarn application -kill " + yarnCluster.getApplicationId() + "\n" + + "Please also note that the temporary files of the YARN session in the home directoy will not be removed."); + exitCode = executeProgramDetached(program, client, userParallelism); + } + else { + // regular (blocking) execution. + exitCode = executeProgramBlocking(program, client, userParallelism); + } + + // show YARN cluster status if its not a detached YARN cluster. + if (yarnCluster != null && !yarnCluster.isDetached()) { + List<String> msgs = yarnCluster.getNewMessages(); + if (msgs != null && msgs.size() > 1) { - logAndSysout("The following messages were created by the YARN cluster while running the Job:"); - for (String msg : msgs) { - logAndSysout(msg); + logAndSysout("The following messages were created by the YARN cluster while running the Job:"); + for (String msg : msgs) { + logAndSysout(msg); + } + } + if (yarnCluster.hasFailed()) { + logAndSysout("YARN cluster is in failed state!"); + logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics()); } } - if (yarnCluster.hasFailed()) { - logAndSysout("YARN cluster is in failed state!"); - logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics()); - } - } - return exitCode; + return exitCode; + } + finally { + client.shutdown(); + } } catch (Throwable t) { return handleError(t); @@ -395,8 +405,10 @@ public class CliFrontend { int parallelism = options.getParallelism(); LOG.info("Creating program plan dump"); - Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism); - FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism); + + Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); + + FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism); if (webFrontend) { this.optimizedPlan = flinkPlan; @@ -425,6 +437,8 @@ public class CliFrontend { } } return 0; + + } catch (Throwable t) { return handleError(t); @@ -623,52 +637,65 @@ public class CliFrontend { // Interaction with programs and JobManager // -------------------------------------------------------------------------------------------- - protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) { - LOG.info("Starting execution of program"); - JobSubmissionResult execResult; + protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) { + JobSubmissionResult result; try { - execResult = client.run(program, parallelism, wait); - } - catch (ProgramInvocationException e) { + result = client.runDetached(program, parallelism); + } catch (ProgramInvocationException e) { return handleError(e); - } - finally { + } finally { program.deleteExtractedLibraries(); } - if(wait) { - LOG.info("Program execution finished"); - } - - // we come here after the job has finished (or the job has been submitted) - if (execResult != null) { + if (result != null) { // if the job has been submitted to a detached YARN cluster, there won't be any // exec results, but the object will be set (for the job id) if (yarnCluster != null && yarnCluster.isDetached()) { - if(execResult.getJobID() == null) { - throw new RuntimeException("Error while starting job. No Job ID set."); - } - yarnCluster.stopAfterJob(execResult.getJobID()); + + yarnCluster.stopAfterJob(result.getJobID()); yarnCluster.disconnect(); - if(!webFrontend) { - System.out.println("The Job has been submitted with JobID "+execResult.getJobID()); + if (!webFrontend) { + System.out.println("The Job has been submitted with JobID " + result.getJobID()); } return 0; - } - if (execResult instanceof JobExecutionResult) { - JobExecutionResult result = (JobExecutionResult) execResult; - if(!webFrontend) { - System.out.println("Job Runtime: " + result.getNetRuntime() + " ms"); - } - Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults(); - if (accumulatorsResult.size() > 0 && !webFrontend) { - System.out.println("Accumulator Results: "); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); - } } else { - LOG.info("The Job did not return an execution result"); + throw new RuntimeException("Error while starting job. No Job ID set."); + } + } + + return 0; + } + + protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) { + LOG.info("Starting execution of program"); + + JobExecutionResult result; + try { + client.setPrintStatusDuringExecution(true); + result = client.runBlocking(program, parallelism); + } + catch (ProgramInvocationException e) { + return handleError(e); + } + finally { + program.deleteExtractedLibraries(); + } + + LOG.info("Program execution finished"); + + if (result != null) { + if (!webFrontend) { + System.out.println("Job Runtime: " + result.getNetRuntime() + " ms"); + } + Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults(); + if (accumulatorsResult.size() > 0 && !webFrontend) { + System.out.println("Accumulator Results: "); + System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); } + } else { + LOG.info("The Job did not return an execution result"); } + return 0; } @@ -767,7 +794,6 @@ public class CliFrontend { * Retrieves a {@link Client} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address - * @param classLoader Class loader to use by the Client * @param programName Program name * @param userParallelism Given user parallelism * @return @@ -775,12 +801,10 @@ public class CliFrontend { */ protected Client getClient( CommandLineOptions options, - ClassLoader classLoader, String programName, int userParallelism) throws Exception { - InetSocketAddress jobManagerAddress = null; - + InetSocketAddress jobManagerAddress; int maxSlots = -1; if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { @@ -796,14 +820,16 @@ public class CliFrontend { // the number of slots available from YARN: int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if(yarnTmSlots == -1) { + if (yarnTmSlots == -1) { yarnTmSlots = 1; } maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if(userParallelism != -1) { + if (userParallelism != -1) { int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " + - "Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots."); + logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + + "but the user requested a parallelism of " + userParallelism + " on YARN. " + + "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + + "will get "+slotsPerTM+" slots."); flinkYarnClient.setTaskManagerSlots(slotsPerTM); } @@ -811,11 +837,12 @@ public class CliFrontend { yarnCluster = flinkYarnClient.deploy(); yarnCluster.connectToCluster(); } - catch(Exception e) { + catch (Exception e) { throw new RuntimeException("Error deploying the YARN cluster", e); } jobManagerAddress = yarnCluster.getJobManagerAddress(); + writeJobManagerAddressToConfig(jobManagerAddress); logAndSysout("YARN cluster started"); logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL()); @@ -847,14 +874,11 @@ public class CliFrontend { else { if(options.getJobManagerAddress() != null) { jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress()); + writeJobManagerAddressToConfig(jobManagerAddress); } } - if(jobManagerAddress != null) { - writeJobManagerAddressToConfig(jobManagerAddress); - } - - return new Client(config, classLoader, maxSlots); + return new Client(config, maxSlots); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index cf08e0a..7928e53 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -22,11 +22,14 @@ import java.util.List; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.api.common.Program; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.optimizer.DataStatistics; @@ -35,52 +38,66 @@ import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; /** - * A class for executing a {@link Plan} on a local embedded Flink runtime instance. + * A PlanExecutor that runs Flink programs on a local embedded Flink runtime instance. + * + * <p>By simply calling the {@link #executePlan(org.apache.flink.api.common.Plan)} method, + * this executor still start up and shut down again immediately after the program finished.</p> + * + * <p>To use this executor to execute many dataflow programs that constitute one job together, + * then this executor needs to be explicitly started, to keep running across several executions.</p> */ public class LocalExecutor extends PlanExecutor { - private static boolean DEFAULT_OVERWRITE = false; + private static final boolean DEFAULT_OVERWRITE = false; private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1; - private final Object lock = new Object(); // we lock to ensure singleton execution - + /** we lock to ensure singleton execution */ + private final Object lock = new Object(); + + /** The mini cluster on which to execute the local programs */ private LocalFlinkMiniCluster flink; + /** Custom user configuration for the execution */ private Configuration configuration; - // ---------------------------------- config options ------------------------------------------ - + /** Config value for how many slots to provide in the local cluster */ private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS; + /** Config flag whether to overwrite existing files by default */ private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE; - - // -------------------------------------------------------------------------------------------- - + + // ------------------------------------------------------------------------ + public LocalExecutor() { - if (!ExecutionEnvironment.localExecutionIsAllowed()) { - throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client."); - } + this(null); } public LocalExecutor(Configuration conf) { - this(); - this.configuration = conf; + if (!ExecutionEnvironment.localExecutionIsAllowed()) { + throw new InvalidProgramException( + "The LocalEnvironment cannot be used when submitting a program through a client."); + } + + this.configuration = conf != null ? conf : new Configuration(); } + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ - public boolean isDefaultOverwriteFiles() { return defaultOverwriteFiles; } - + public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) { this.defaultOverwriteFiles = defaultOverwriteFiles; } - + public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; } @@ -88,51 +105,48 @@ public class LocalExecutor extends PlanExecutor { public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; } - - // -------------------------------------------------------------------------------------------- - public static Configuration createConfiguration(LocalExecutor le) { - Configuration configuration = new Configuration(); - configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots()); - configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles()); - return configuration; - } + // -------------------------------------------------------------------------------------------- + @Override public void start() throws Exception { - synchronized (this.lock) { - if (this.flink == null) { - + synchronized (lock) { + if (flink == null) { // create the embedded runtime - Configuration configuration = createConfiguration(this); - if(this.configuration != null) { + Configuration configuration = createConfiguration(); + if (this.configuration != null) { configuration.addAll(this.configuration); } // start it up - this.flink = new LocalFlinkMiniCluster(configuration, true); + flink = new LocalFlinkMiniCluster(configuration, true); this.flink.start(); } else { throw new IllegalStateException("The local executor was already started."); } } } - - /** - * Stop the local executor instance. You should not call executePlan after this. - */ + + @Override public void stop() throws Exception { - synchronized (this.lock) { - if (this.flink != null) { - this.flink.stop(); - this.flink = null; - } else { - throw new IllegalStateException("The local executor was not started."); + synchronized (lock) { + if (flink != null) { + flink.stop(); + flink = null; } } } + @Override + public boolean isRunning() { + return flink != null; + } + /** - * Execute the given plan on the local Nephele instance, wait for the job to - * finish and return the runtime in milliseconds. + * Executes the given program on a local runtime and waits for the job to finish. + * + * <p>If the executor has not been started before, this starts the executor and shuts it down + * after the job finished. If the job runs in session mode, the executor is kept alive until + * no more references to the executor exist.</p> * * @param plan The plan of the program to execute. * @return The net runtime of the program, in milliseconds. @@ -145,15 +159,15 @@ public class LocalExecutor extends PlanExecutor { if (plan == null) { throw new IllegalArgumentException("The plan may not be null."); } - + synchronized (this.lock) { - + // check if we start a session dedicated for this execution final boolean shutDownAtEnd; - if (this.flink == null) { - // we start a session just for us now + + if (flink == null) { shutDownAtEnd = true; - + // configure the number of local slots equal to the parallelism of the local plan if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) { int maxParallelism = plan.getMaximumParallelism(); @@ -161,9 +175,11 @@ public class LocalExecutor extends PlanExecutor { this.taskManagerNumSlots = maxParallelism; } } - + + // start the cluster for us start(); - } else { + } + else { // we use the existing session shutDownAtEnd = false; } @@ -173,10 +189,10 @@ public class LocalExecutor extends PlanExecutor { Optimizer pc = new Optimizer(new DataStatistics(), configuration); OptimizedPlan op = pc.compile(plan); - + JobGraphGenerator jgg = new JobGraphGenerator(configuration); - JobGraph jobGraph = jgg.compileJobGraph(op); - + JobGraph jobGraph = jgg.compileJobGraph(op, plan.getJobId()); + boolean sysoutPrint = isPrintingStatusDuringExecution(); return flink.submitJobAndWait(jobGraph, sysoutPrint); } @@ -189,32 +205,50 @@ public class LocalExecutor extends PlanExecutor { } /** - * Returns a JSON dump of the optimized plan. - * - * @param plan - * The program's plan. - * @return JSON dump of the optimized plan. - * @throws Exception + * Creates a JSON representation of the given dataflow's execution plan. + * + * @param plan The dataflow plan. + * @return The dataflow's execution plan, as a JSON string. + * @throws Exception Thrown, if the optimization process that creates the execution plan failed. */ @Override public String getOptimizerPlanAsJSON(Plan plan) throws Exception { - Optimizer pc = new Optimizer(new DataStatistics(), createConfiguration(this)); + final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism(); + + Optimizer pc = new Optimizer(new DataStatistics(), this.configuration); + pc.setDefaultParallelism(parallelism); OptimizedPlan op = pc.compile(plan); - PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); - - return gen.getOptimizerPlanAsJSON(op); + + return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op); } - + + @Override + public void endSession(JobID jobID) throws Exception { + LocalFlinkMiniCluster flink = this.flink; + if (flink != null) { + ActorGateway leaderGateway = flink.getLeaderGateway(AkkaUtils.getDefaultTimeout()); + leaderGateway.tell(new JobManagerMessages.RemoveCachedJob(jobID)); + } + } + + private Configuration createConfiguration() { + Configuration configuration = new Configuration(); + configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots()); + configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles()); + return configuration; + } + + // -------------------------------------------------------------------------------------------- // Static variants that internally bring up an instance and shut it down after the execution // -------------------------------------------------------------------------------------------- - + /** - * Executes the program described by the given plan assembler. + * Executes the given program. * - * @param pa The program's plan assembler. + * @param pa The program. * @param args The parameters. - * @return The net runtime of the program, in milliseconds. + * @return The execution result of the program. * * @throws Exception Thrown, if either the startup of the local execution context, or the execution * caused an exception. @@ -222,57 +256,45 @@ public class LocalExecutor extends PlanExecutor { public static JobExecutionResult execute(Program pa, String... args) throws Exception { return execute(pa.getPlan(args)); } - + /** - * Executes the program represented by the given Pact plan. + * Executes the given dataflow plan. * - * @param plan The program's plan. - * @return The net runtime of the program, in milliseconds. + * @param plan The dataflow plan. + * @return The execution result. * * @throws Exception Thrown, if either the startup of the local execution context, or the execution * caused an exception. */ public static JobExecutionResult execute(Plan plan) throws Exception { - LocalExecutor exec = new LocalExecutor(); - try { - exec.start(); - return exec.executePlan(plan); - } finally { - exec.stop(); - } + return new LocalExecutor().executePlan(plan); } /** - * Returns a JSON dump of the optimized plan. + * Creates a JSON representation of the given dataflow's execution plan. * - * @param plan - * The program's plan. - * @return JSON dump of the optimized plan. - * @throws Exception + * @param plan The dataflow plan. + * @return The dataflow's execution plan, as a JSON string. + * @throws Exception Thrown, if the optimization process that creates the execution plan failed. */ public static String optimizerPlanAsJSON(Plan plan) throws Exception { - LocalExecutor exec = new LocalExecutor(); - try { - exec.start(); - Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration()); - OptimizedPlan op = pc.compile(plan); - PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); - - return gen.getOptimizerPlanAsJSON(op); - } finally { - exec.stop(); - } + final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism(); + + Optimizer pc = new Optimizer(new DataStatistics(), new Configuration()); + pc.setDefaultParallelism(parallelism); + OptimizedPlan op = pc.compile(plan); + + return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(op); } /** - * Return unoptimized plan as JSON. + * Creates a JSON representation of the given dataflow plan. * - * @param plan The program plan. - * @return The plan as a JSON object. + * @param plan The dataflow plan. + * @return The dataflow plan (prior to optimization) as a JSON string. */ public static String getPlanAsJSON(Plan plan) { - PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); List<DataSinkNode> sinks = Optimizer.createPreOptimizedPlan(plan); - return gen.getPactPlanAsJSON(sinks); + return new PlanJSONDumpGenerator().getPactPlanAsJSON(sinks); } } http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index 20169f6..e8e9ade 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -18,7 +18,6 @@ package org.apache.flink.client; -import java.io.File; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -26,36 +25,41 @@ import java.util.Collections; import java.util.List; import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.JobWithJars; -import org.apache.flink.client.program.PackagedProgram; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.configuration.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * The RemoteExecutor is a {@link org.apache.flink.api.common.PlanExecutor} that takes the program * and ships it to a remote Flink cluster for execution. * - * The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the - * set of libraries that need to be shipped together with the program. + * <p>The RemoteExecutor is pointed at the JobManager and gets the program and (if necessary) the + * set of libraries that need to be shipped together with the program.</p> * - * The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to - * remotely execute program parts. + * <p>The RemoteExecutor is used in the {@link org.apache.flink.api.java.RemoteEnvironment} to + * remotely execute program parts.</p> */ public class RemoteExecutor extends PlanExecutor { + + private final Object lock = new Object(); - private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); - private final List<String> jarFiles; private final Configuration clientConfiguration; + + private Client client; + + private int defaultParallelism = 1; + public RemoteExecutor(String hostname, int port) { this(hostname, port, Collections.<String>emptyList(), new Configuration()); @@ -97,51 +101,148 @@ public class RemoteExecutor extends PlanExecutor { clientConfiguration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, inet.getPort()); } + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + /** + * Sets the parallelism that will be used when neither the program does not define + * any parallelism at all. + * + * @param defaultParallelism The default parallelism for the executor. + */ + public void setDefaultParallelism(int defaultParallelism) { + if (defaultParallelism < 1) { + throw new IllegalArgumentException("The default parallelism must be at least one"); + } + this.defaultParallelism = defaultParallelism; + } + + /** + * Gets the parallelism that will be used when neither the program does not define + * any parallelism at all. + * + * @return The default parallelism for the executor. + */ + public int getDefaultParallelism() { + return defaultParallelism; + } + + // ------------------------------------------------------------------------ + // Startup & Shutdown + // ------------------------------------------------------------------------ + + + @Override + public void start() throws Exception { + synchronized (lock) { + if (client == null) { + client = new Client(clientConfiguration); + client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); + } + else { + throw new IllegalStateException("The remote executor was already started."); + } + } + } + + @Override + public void stop() throws Exception { + synchronized (lock) { + if (client != null) { + client.shutdown(); + client = null; + } + } + } + + @Override + public boolean isRunning() { + return client != null; + } + + // ------------------------------------------------------------------------ + // Executing programs + // ------------------------------------------------------------------------ + @Override public JobExecutionResult executePlan(Plan plan) throws Exception { + if (plan == null) { + throw new IllegalArgumentException("The plan may not be null."); + } + JobWithJars p = new JobWithJars(plan, this.jarFiles); return executePlanWithJars(p); } - - public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception { - Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1); - c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); - - JobSubmissionResult result = c.run(p, -1, true); - if (result instanceof JobExecutionResult) { - return (JobExecutionResult) result; - } else { - LOG.warn("The Client didn't return a JobExecutionResult"); - return new JobExecutionResult(result.getJobID(), -1, null); + + public JobExecutionResult executePlanWithJars(JobWithJars program) throws Exception { + if (program == null) { + throw new IllegalArgumentException("The job may not be null."); } - } - public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception { - File jarFile = new File(jarPath); - PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args); - - Client c = new Client(clientConfiguration, program.getUserCodeClassLoader(), -1); - c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); - - JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true); - if(result instanceof JobExecutionResult) { - return (JobExecutionResult) result; - } else { - LOG.warn("The Client didn't return a JobExecutionResult"); - return new JobExecutionResult(result.getJobID(), -1, null); + synchronized (this.lock) { + // check if we start a session dedicated for this execution + final boolean shutDownAtEnd; + + if (client == null) { + shutDownAtEnd = true; + // start the executor for us + start(); + } + else { + // we use the existing session + shutDownAtEnd = false; + } + + try { + return client.runBlocking(program, defaultParallelism); + } + finally { + if (shutDownAtEnd) { + stop(); + } + } } } @Override public String getOptimizerPlanAsJSON(Plan plan) throws Exception { - JobWithJars p = new JobWithJars(plan, this.jarFiles); - Client c = new Client(clientConfiguration, p.getUserCodeClassLoader(), -1); - - OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1); - PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); - return jsonGen.getOptimizerPlanAsJSON(op); + Optimizer opt = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration()); + OptimizedPlan optPlan = opt.compile(plan); + return new PlanJSONDumpGenerator().getOptimizerPlanAsJSON(optPlan); } - + + @Override + public void endSession(JobID jobID) throws Exception { + if (jobID == null) { + throw new NullPointerException("The supplied jobID must not be null."); + } + + synchronized (this.lock) { + // check if we start a session dedicated for this execution + final boolean shutDownAtEnd; + + if (client == null) { + shutDownAtEnd = true; + // start the executor for us + start(); + } + else { + // we use the existing session + shutDownAtEnd = false; + } + + try { + client.endSession(jobID); + } + finally { + if (shutDownAtEnd) { + stop(); + } + } + } + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -168,5 +269,4 @@ public class RemoteExecutor extends PlanExecutor { } return new InetSocketAddress(host, port); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index e7464c8..6c886fe 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -18,10 +18,9 @@ package org.apache.flink.client.program; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.io.PrintStream; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -32,8 +31,6 @@ import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -65,7 +62,6 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import akka.actor.ActorSystem; -import com.google.common.base.Preconditions; /** * Encapsulates the functionality necessary to submit a program to a remote cluster. @@ -78,62 +74,139 @@ public class Client { * The configuration to use for the client (optimizer, timeouts, ...) and to connect to the * JobManager. */ - private final Configuration configuration; - /** The optimizer used in the optimization of batch programs */ - private final Optimizer compiler; + final Optimizer compiler; + + /** The actor system used to communicate with the JobManager */ + private final ActorSystem actorSystem; - /** The class loader to use for classes from the user program (e.g., functions and data types) */ - private final ClassLoader userCodeClassLoader; + /** The actor reference to the JobManager */ + private final ActorGateway jobManagerGateway; + + /** The timeout for communication between the client and the JobManager */ + private final FiniteDuration timeout; + + /** + * If != -1, this field specifies the total number of available slots on the cluster + * connected to the client. + */ + private final int maxSlots; /** Flag indicating whether to sysout print execution updates */ private boolean printStatusDuringExecution = true; /** - * If != -1, this field specifies the total number of available slots on the cluster - * connected to the client. + * For interactive invocations, the Job ID is only available after the ContextEnvironment has + * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment + * which lets us access the last JobID here. */ - private int maxSlots; + private JobID lastJobID; - /** ID of the last job submitted with this client. */ - private JobID lastJobId = null; - - // ------------------------------------------------------------------------ // Construction // ------------------------------------------------------------------------ /** * Creates a instance that submits the programs to the JobManager defined in the - * configuration. It sets the maximum number of slots to unknown (= -1). + * configuration. This method will try to resolve the JobManager hostname and throw an exception + * if that is not possible. * - * @param config The config used to obtain the JobManager's address. - * @param userCodeClassLoader The class loader to use for loading user code classes. + * @param config The config used to obtain the job-manager's address, and used to configure the optimizer. + * + * @throws java.io.IOException Thrown, if the client's actor system could not be started. + * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. */ - public Client(Configuration config, ClassLoader userCodeClassLoader) { - this(config, userCodeClassLoader, -1); + public Client(Configuration config) throws IOException { + this(config, -1); } /** - * Creates a instance that submits the programs to the JobManager defined in the - * configuration. + * Creates a new instance of the class that submits the jobs to a job-manager. + * at the given address using the default port. * - * @param config The config used to obtain the JobManager's address. - * @param userCodeClassLoader The class loader to use for loading user code classes. - * @param maxSlots The number of maxSlots on the cluster if != -1 + * @param config The configuration for the client-side processes, like the optimizer. + * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1. + * + * @throws java.io.IOException Thrown, if the client's actor system could not be started. + * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved. */ - public Client(Configuration config, ClassLoader userCodeClassLoader, int maxSlots) { - Preconditions.checkNotNull(config, "Configuration is null"); - Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null"); - - this.configuration = config; - this.userCodeClassLoader = userCodeClassLoader; + public Client(Configuration config, int maxSlots) throws IOException { - this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); + this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); this.maxSlots = maxSlots; + + LOG.info("Starting client actor system"); + + try { + this.actorSystem = JobClient.startJobClientActorSystem(config); + } catch (Exception e) { + throw new IOException("Could start client actor system.", e); + } + + // from here on, we need to make sure the actor system is shut down on error + boolean success = false; + + try { + + FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(config); + this.timeout = AkkaUtils.getTimeout(config); + + LOG.info("Looking up JobManager"); + LeaderRetrievalService leaderRetrievalService; + + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config); + } catch (Exception e) { + throw new IOException("Could not create the leader retrieval service.", e); + } + + try { + this.jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway( + leaderRetrievalService, + actorSystem, + lookupTimeout); + } catch (LeaderRetrievalException e) { + throw new IOException("Failed to retrieve JobManager gateway", e); + } + + LOG.info("Leading JobManager actor system address is " + this.jobManagerGateway.path()); + + LOG.info("JobManager runs at " + this.jobManagerGateway.path()); + + LOG.info("Communication between client and JobManager will have a timeout of " + this.timeout); + success = true; + } finally { + if (!success) { + try { + this.actorSystem.shutdown(); + + // wait at most for 30 seconds, to work around an occasional akka problem + actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS)); + } catch (Throwable t) { + LOG.error("Shutting down actor system after error caused another error", t); + } + } + } } + // ------------------------------------------------------------------------ + // Startup & Shutdown + // ------------------------------------------------------------------------ /** + * Shuts down the client. This stops the internal actor system and actors. + */ + public void shutdown() { + if (!this.actorSystem.isTerminated()) { + this.actorSystem.shutdown(); + this.actorSystem.awaitTermination(); + } + } + + // ------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------ + + /** * Configures whether the client should print progress updates during the execution to {@code System.out}. * All updates are logged via the SLF4J loggers regardless of this setting. * @@ -159,118 +232,84 @@ public class Client { } // ------------------------------------------------------------------------ - // Compilation and Submission + // Access to the Program's Plan // ------------------------------------------------------------------------ - public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { + public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism) + throws CompilerException, ProgramInvocationException + { PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); - return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism)); + return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(compiler, prog, parallelism)); } - - public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { + + public static FlinkPlan getOptimizedPlan(Optimizer compiler, PackagedProgram prog, int parallelism) + throws CompilerException, ProgramInvocationException + { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { - return getOptimizedPlan(prog.getPlanWithJars(), parallelism); - } - else if (prog.isUsingInteractiveMode()) { + return getOptimizedPlan(compiler, prog.getPlanWithJars(), parallelism); + } else if (prog.isUsingInteractiveMode()) { // temporary hack to support the optimizer plan preview - OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler); + OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(compiler); if (parallelism > 0) { env.setParallelism(parallelism); } - env.setAsContext(); - - // temporarily write syserr and sysout to a byte array. - PrintStream originalOut = System.out; - PrintStream originalErr = System.err; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - System.setOut(new PrintStream(baos)); - ByteArrayOutputStream baes = new ByteArrayOutputStream(); - System.setErr(new PrintStream(baes)); - try { - ContextEnvironment.enableLocalExecution(false); - prog.invokeInteractiveModeForExecution(); - } - catch (ProgramInvocationException e) { - throw e; - } - catch (Throwable t) { - // the invocation gets aborted with the preview plan - if (env.optimizerPlan != null) { - return env.optimizerPlan; - } else { - throw new ProgramInvocationException("The program caused an error: ", t); - } - } - finally { - ContextEnvironment.enableLocalExecution(true); - System.setOut(originalOut); - System.setErr(originalErr); - System.err.println(baes); - System.out.println(baos); - } - - throw new ProgramInvocationException( - "The program plan could not be fetched - the program aborted pre-maturely.\n" - + "System.err: " + baes.toString() + '\n' - + "System.out: " + baos.toString() + '\n'); - } - else { - throw new RuntimeException(); + + return env.getOptimizedPlan(prog); + } else { + throw new RuntimeException("Couldn't determine program mode."); } } - - public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException { + + public static OptimizedPlan getOptimizedPlan(Optimizer compiler, Plan p, int parallelism) throws CompilerException { if (parallelism > 0 && p.getDefaultParallelism() <= 0) { - LOG.debug("Changing plan default parallelism from {} to {}",p.getDefaultParallelism(), parallelism); + LOG.debug("Changing plan default parallelism from {} to {}", p.getDefaultParallelism(), parallelism); p.setDefaultParallelism(parallelism); } LOG.debug("Set parallelism {}, plan default parallelism {}", parallelism, p.getDefaultParallelism()); - return this.compiler.compile(p); - } - - - /** - * Creates the optimized plan for a given program, using this client's compiler. - * - * @param prog The program to be compiled. - * @return The compiled and optimized plan, as returned by the compiler. - * @throws CompilerException Thrown, if the compiler encounters an illegal situation. - * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file. - */ - public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException { - return getOptimizedPlan(prog.getPlan(), parallelism); - } - - public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { - return getJobGraph(optPlan, prog.getAllLibraries()); + return compiler.compile(p); } - - private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) { - JobGraph job; - if (optPlan instanceof StreamingPlan) { - job = ((StreamingPlan) optPlan).getJobGraph(); - } else { - JobGraphGenerator gen = new JobGraphGenerator(this.configuration); - job = gen.compileJobGraph((OptimizedPlan) optPlan); - } - for (File jar : jarFiles) { - job.addJar(new Path(jar.getAbsolutePath())); + // ------------------------------------------------------------------------ + // Program submission / execution + // ------------------------------------------------------------------------ + + public JobExecutionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException { + Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); + if (prog.isUsingProgramEntryPoint()) { + return runBlocking(prog.getPlanWithJars(), parallelism); } + else if (prog.isUsingInteractiveMode()) { + LOG.info("Starting program in interactive mode"); + ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, true); + ContextEnvironment.enableLocalExecution(false); - return job; + // invoke here + try { + prog.invokeInteractiveModeForExecution(); + } + finally { + ContextEnvironment.enableLocalExecution(true); + } + + return JobExecutionResult.fromJobSubmissionResult(new JobSubmissionResult(lastJobID)); + } + else { + throw new RuntimeException(); + } } - public JobSubmissionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException { + public JobSubmissionResult runDetached(PackagedProgram prog, int parallelism) + throws ProgramInvocationException + { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { - return run(prog.getPlanWithJars(), parallelism, wait); + return runDetached(prog.getPlanWithJars(), parallelism); } else if (prog.isUsingInteractiveMode()) { LOG.info("Starting program in interactive mode"); - ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait); + ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, false); ContextEnvironment.enableLocalExecution(false); // invoke here @@ -281,113 +320,108 @@ public class Client { ContextEnvironment.enableLocalExecution(true); } - // Job id has been set in the Client passed to the ContextEnvironment - return new JobSubmissionResult(lastJobId); + return new JobSubmissionResult(lastJobID); } else { - throw new RuntimeException(); + throw new RuntimeException("PackagedProgram does not have a valid invocation mode."); } } - - public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException { - return run(optimizedPlan, prog.getAllLibraries(), wait); - } - /** - * Runs a program on Flink cluster whose job-manager is configured in this client's configuration. - * This method involves all steps, from compiling, job-graph generation to submission. - * - * @param prog The program to be executed. + * Runs a program on the Flink cluster to which this client is connected. The call blocks until the + * execution is complete, and returns afterwards. + * + * @param program The program to be executed. * @param parallelism The default parallelism to use when running the program. The default parallelism is used * when the program does not set a parallelism by itself. - * @param wait A flag that indicates whether this function call should block until the program execution is done. + * * @throws CompilerException Thrown, if the compiler encounters an illegal situation. * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file, * or if the submission failed. That might be either due to an I/O problem, * i.e. the job-manager is unreachable, or due to the fact that the * parallel execution failed. */ - public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException { - return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait); + public JobExecutionResult runBlocking(JobWithJars program, int parallelism) + throws CompilerException, ProgramInvocationException + { + ClassLoader classLoader = program.getUserCodeClassLoader(); + if (classLoader == null) { + throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); + } + + OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism); + return runBlocking(optPlan, program.getJarFiles(), classLoader); + } + + /** + * Submits a program to the Flink cluster to which this client is connected. The call returns after the + * program was submitted and does not wait for the program to complete. + * + * @param program The program to be executed. + * @param parallelism The default parallelism to use when running the program. The default parallelism is used + * when the program does not set a parallelism by itself. + * + * @throws CompilerException Thrown, if the compiler encounters an illegal situation. + * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file, + * or if the submission failed. That might be either due to an I/O problem, + * i.e. the job-manager is unreachable. + */ + public JobSubmissionResult runDetached(JobWithJars program, int parallelism) + throws CompilerException, ProgramInvocationException + { + ClassLoader classLoader = program.getUserCodeClassLoader(); + if (classLoader == null) { + throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); + } + + OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism); + return runDetached(optimizedPlan, program.getJarFiles(), classLoader); } - public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException { + public JobExecutionResult runBlocking(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader) + throws ProgramInvocationException + { JobGraph job = getJobGraph(compiledPlan, libraries); - this.lastJobId = job.getJobID(); - return run(job, wait); + return runBlocking(job, classLoader); } - public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException { - this.lastJobId = jobGraph.getJobID(); - - LOG.info("Starting client actor system"); - final ActorSystem actorSystem; + public JobSubmissionResult runDetached(OptimizedPlan compiledPlan, List<File> libraries, ClassLoader classLoader) + throws ProgramInvocationException + { + JobGraph job = getJobGraph(compiledPlan, libraries); + return runDetached(job, classLoader); + } + + public JobExecutionResult runBlocking(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + LOG.info("Checking and uploading JAR files"); try { - actorSystem = JobClient.startJobClientActorSystem(configuration); + JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout); + } catch (IOException e) { + throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e); } - catch (Exception e) { - throw new ProgramInvocationException("Could start client actor system.", e); + try { + this.lastJobID = jobGraph.getJobID(); + return JobClient.submitJobAndWait(actorSystem, jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, classLoader); + } catch (JobExecutionException e) { + throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); } + } + public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + LOG.info("Checking and uploading JAR files"); try { - FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); - FiniteDuration timeout = AkkaUtils.getTimeout(configuration); - - LOG.info("Looking up JobManager"); - ActorGateway jobManagerGateway; - - LeaderRetrievalService leaderRetrievalService; - - try { - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - } catch (Exception e) { - throw new ProgramInvocationException("Could not create the leader retrieval service.", e); - } - - try { - jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway( - leaderRetrievalService, - actorSystem, - lookupTimeout); - } catch (LeaderRetrievalException e) { - throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e); - } - - LOG.info("Leading JobManager actor system address is " + jobManagerGateway.path()); - - LOG.info("JobManager runs at " + jobManagerGateway.path()); - - LOG.info("Communication between client and JobManager will have a timeout of " + timeout); - - LOG.info("Checking and uploading JAR files"); - try { - JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout); - } catch (IOException e) { - throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e); - } - - try { - if (wait) { - return JobClient.submitJobAndWait(actorSystem, - jobManagerGateway, jobGraph, timeout, printStatusDuringExecution, userCodeClassLoader); - } else { - JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, userCodeClassLoader); - // return a dummy execution result with the JobId - return new JobSubmissionResult(jobGraph.getJobID()); - } - } catch (JobExecutionException e) { + JobClient.uploadJarFiles(jobGraph, jobManagerGateway, timeout); + } + catch (IOException e) { + throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e); + } + try { + this.lastJobID = jobGraph.getJobID(); + JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, classLoader); + return new JobSubmissionResult(jobGraph.getJobID()); + } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); - } catch (Exception e) { - throw new ProgramInvocationException("Exception during program execution.", e); - } - } finally { - // shut down started actor system - actorSystem.shutdown(); - - // wait at most for 30 seconds, to work around an occasional akka problem - actorSystem.awaitTermination(new FiniteDuration(30, TimeUnit.SECONDS)); } } @@ -397,62 +431,26 @@ public class Client { * @throws Exception In case an error occurred. */ public void cancel(JobID jobId) throws Exception { - final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); - final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); - - ActorSystem actorSystem; + Future<Object> response; try { - actorSystem = JobClient.startJobClientActorSystem(configuration); + response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout); } catch (Exception e) { - throw new ProgramInvocationException("Could start client actor system.", e); + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); } - try { - ActorGateway jobManagerGateway; - - LeaderRetrievalService leaderRetrievalService; + Object result = Await.result(response, timeout); - try { - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - } catch (Exception e) { - throw new ProgramInvocationException("Could not create the leader retrieval service.", e); - } - - try { - jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway( - leaderRetrievalService, - actorSystem, - lookupTimeout); - } catch (LeaderRetrievalException e) { - throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e); - } - - Future<Object> response; - try { - response = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout); - } catch (Exception e) { - throw new ProgramInvocationException("Failed to query the job manager gateway.", e); - } - - Object result = Await.result(response, timeout); - - if (result instanceof JobManagerMessages.CancellationSuccess) { - LOG.debug("Job cancellation with ID " + jobId + " succeeded."); - } else if (result instanceof JobManagerMessages.CancellationFailure) { - Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); - LOG.debug("Job cancellation with ID " + jobId + " failed.", t); - throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); - } else { - throw new Exception("Unknown message received while cancelling."); - } - } finally { - // shut down started actor system - actorSystem.shutdown(); - actorSystem.awaitTermination(); + if (result instanceof JobManagerMessages.CancellationSuccess) { + LOG.debug("Job cancellation with ID " + jobId + " succeeded."); + } else if (result instanceof JobManagerMessages.CancellationFailure) { + Throwable t = ((JobManagerMessages.CancellationFailure) result).cause(); + LOG.debug("Job cancellation with ID " + jobId + " failed.", t); + throw new Exception("Failed to cancel the job because of \n" + t.getMessage()); + } else { + throw new Exception("Unknown message received while cancelling."); } } - /** * Requests and returns the accumulators for the given job identifier. Accumulators can be * requested while a is running or after it has finished. The default class loader is used @@ -473,117 +471,98 @@ public class Client { */ public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws Exception { - final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); - final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); - - ActorSystem actorSystem; + Future<Object> response; try { - actorSystem = JobClient.startJobClientActorSystem(configuration); + response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout); } catch (Exception e) { - throw new Exception("Could start client actor system.", e); + throw new Exception("Failed to query the job manager gateway for accumulators.", e); } - try { - ActorGateway jobManagerGateway; - - LeaderRetrievalService leaderRetrievalService; - - try { - leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); - } catch (Exception e) { - throw new ProgramInvocationException("Could not create the leader retrieval service.", e); - } - - try { - jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway( - leaderRetrievalService, - actorSystem, - lookupTimeout); - } catch (LeaderRetrievalException e) { - throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e); - } - - Future<Object> response; - try { - response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout); - } catch (Exception e) { - throw new Exception("Failed to query the job manager gateway for accumulators.", e); - } - - Object result = Await.result(response, timeout); + Object result = Await.result(response, timeout); - if (result instanceof AccumulatorResultsFound) { - Map<String, SerializedValue<Object>> serializedAccumulators = - ((AccumulatorResultsFound) result).result(); + if (result instanceof AccumulatorResultsFound) { + Map<String, SerializedValue<Object>> serializedAccumulators = + ((AccumulatorResultsFound) result).result(); - return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader); + return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader); - } else if (result instanceof AccumulatorResultsErroneous) { - throw ((AccumulatorResultsErroneous) result).cause(); - } else { - throw new Exception("Failed to fetch accumulators for the job " + jobID + "."); - } - } finally { - actorSystem.shutdown(); - actorSystem.awaitTermination(); + } else if (result instanceof AccumulatorResultsErroneous) { + throw ((AccumulatorResultsErroneous) result).cause(); + } else { + throw new Exception("Failed to fetch accumulators for the job " + jobID + "."); } } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Sessions + // ------------------------------------------------------------------------ - public static final class OptimizerPlanEnvironment extends ExecutionEnvironment { - - private final Optimizer compiler; - - private FlinkPlan optimizerPlan; - - - private OptimizerPlanEnvironment(Optimizer compiler) { - this.compiler = compiler; - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - Plan plan = createProgramPlan(jobName); - this.optimizerPlan = compiler.compile(plan); - - // do not go on with anything now! - throw new ProgramAbortException(); + /** + * Tells the JobManager to finish the session (job) defined by the given ID. + * + * @param jobId The ID that identifies the session. + */ + public void endSession(JobID jobId) throws Exception { + if (jobId == null) { + throw new IllegalArgumentException("The JobID must not be null."); } + endSessions(Collections.singletonList(jobId)); + } - @Override - public String getExecutionPlan() throws Exception { - Plan plan = createProgramPlan(null, false); - this.optimizerPlan = compiler.compile(plan); - - // do not go on with anything now! - throw new ProgramAbortException(); - } - - private void setAsContext() { - ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { - - @Override - public ExecutionEnvironment createExecutionEnvironment() { - return OptimizerPlanEnvironment.this; - } - }; - initializeContextEnvironment(factory); + /** + * Tells the JobManager to finish the sessions (jobs) defined by the given IDs. + * + * @param jobIds The IDs that identify the sessions. + */ + public void endSessions(List<JobID> jobIds) throws Exception { + if (jobIds == null) { + throw new IllegalArgumentException("The JobIDs must not be null"); } - public void setPlan(FlinkPlan plan){ - this.optimizerPlan = plan; + for (JobID jid : jobIds) { + if (jid != null) { + LOG.info("Telling job manager to end the session {}.", jid); + jobManagerGateway.tell(new JobManagerMessages.RemoveCachedJob(jid)); + } } } - // -------------------------------------------------------------------------------------------- + // ------------------------------------------------------------------------ + // Internal translation methods + // ------------------------------------------------------------------------ /** - * A special exception used to abort programs when the caller is only interested in the - * program plan, rather than in the full execution. + * Creates the optimized plan for a given program, using this client's compiler. + * + * @param prog The program to be compiled. + * @return The compiled and optimized plan, as returned by the compiler. + * @throws CompilerException Thrown, if the compiler encounters an illegal situation. + * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file. */ - public static final class ProgramAbortException extends Error { - private static final long serialVersionUID = 1L; + private static OptimizedPlan getOptimizedPlan(Optimizer compiler, JobWithJars prog, int parallelism) throws CompilerException, + ProgramInvocationException { + return getOptimizedPlan(compiler, prog.getPlan(), parallelism); } + + public static JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { + return getJobGraph(optPlan, prog.getAllLibraries()); + } + + private static JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) { + JobGraph job; + if (optPlan instanceof StreamingPlan) { + job = ((StreamingPlan) optPlan).getJobGraph(); + } else { + JobGraphGenerator gen = new JobGraphGenerator(); + job = gen.compileJobGraph((OptimizedPlan) optPlan); + } + + for (File jar : jarFiles) { + job.addJar(new Path(jar.getAbsolutePath())); + } + + return job; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 9287017..ad14a06 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -22,6 +22,7 @@ import java.io.File; import java.util.List; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; @@ -39,15 +40,14 @@ public class ContextEnvironment extends ExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(ContextEnvironment.class); private final Client client; - + private final List<File> jarFilesToAttach; - + private final ClassLoader userCodeClassLoader; private final boolean wait; - - - + + public ContextEnvironment(Client remoteConnection, List<File> jarFiles, ClassLoader userCodeClassLoader, boolean wait) { this.client = remoteConnection; this.jarFilesToAttach = jarFiles; @@ -60,27 +60,33 @@ public class ContextEnvironment extends ExecutionEnvironment { Plan p = createProgramPlan(jobName); JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader); - JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait); - if(result instanceof JobExecutionResult) { - this.lastJobExecutionResult = (JobExecutionResult) result; - return (JobExecutionResult) result; - } else { - LOG.warn("The Client didn't return a JobExecutionResult"); - this.lastJobExecutionResult = new JobExecutionResult(result.getJobID(), -1, null); + if (wait) { + this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism()); + return this.lastJobExecutionResult; + } + else { + JobSubmissionResult result = client.runDetached(toRun, getParallelism()); + LOG.warn("Job was executed in detached mode, the results will be available on completion."); + this.lastJobExecutionResult = JobExecutionResult.fromJobSubmissionResult(result); return this.lastJobExecutionResult; } } @Override public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan("unnamed job"); - - OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getParallelism()); + Plan plan = createProgramPlan("unnamed job"); + OptimizedPlan op = Client.getOptimizedPlan(client.compiler, plan, getParallelism()); PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); return gen.getOptimizerPlanAsJSON(op); } + @Override + public void startNewSession() throws Exception { + client.endSession(jobID); + jobID = JobID.generate(); + } + public boolean isWait() { return wait; } @@ -104,7 +110,9 @@ public class ContextEnvironment extends ExecutionEnvironment { static void setAsContext(Client client, List<File> jarFilesToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, boolean wait) { - initializeContextEnvironment(new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait)); + ContextEnvironmentFactory factory = + new ContextEnvironmentFactory(client, jarFilesToAttach, userCodeClassLoader, defaultParallelism, wait); + initializeContextEnvironment(factory); } protected static void enableLocalExecution(boolean enabled) { http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java index b86487f..9e84e2d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/JobWithJars.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.client.program; import java.io.File; @@ -30,6 +29,10 @@ import java.util.List; import org.apache.flink.api.common.Plan; +/** + * A JobWithJars is a Flink dataflow plan, together with a bunch of JAR files that contain + * the classes of the functions and libraries necessary for the execution. + */ public class JobWithJars { private Plan plan; http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java new file mode 100644 index 0000000..c9c3b45 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.FlinkPlan; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; + +public class OptimizerPlanEnvironment extends ExecutionEnvironment { + + private final Optimizer compiler; + + private FlinkPlan optimizerPlan; + + public OptimizerPlanEnvironment(Optimizer compiler) { + this.compiler = compiler; + } + + // ------------------------------------------------------------------------ + // Execution Environment methods + // ------------------------------------------------------------------------ + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + Plan plan = createProgramPlan(jobName); + this.optimizerPlan = compiler.compile(plan); + + // do not go on with anything now! + throw new ProgramAbortException(); + } + + @Override + public String getExecutionPlan() throws Exception { + Plan plan = createProgramPlan(null, false); + this.optimizerPlan = compiler.compile(plan); + + // do not go on with anything now! + throw new ProgramAbortException(); + } + + @Override + public void startNewSession() { + // do nothing + } + + public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException { + setAsContext(); + + // temporarily write syserr and sysout to a byte array. + PrintStream originalOut = System.out; + PrintStream originalErr = System.err; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + System.setOut(new PrintStream(baos)); + ByteArrayOutputStream baes = new ByteArrayOutputStream(); + System.setErr(new PrintStream(baes)); + try { + ContextEnvironment.enableLocalExecution(false); + prog.invokeInteractiveModeForExecution(); + } + catch (ProgramInvocationException e) { + throw e; + } + catch (Throwable t) { + // the invocation gets aborted with the preview plan + if (optimizerPlan != null) { + return optimizerPlan; + } else { + throw new ProgramInvocationException("The program caused an error: ", t); + } + } + finally { + ContextEnvironment.enableLocalExecution(true); + System.setOut(originalOut); + System.setErr(originalErr); + System.err.println(baes); + System.out.println(baos); + } + + throw new ProgramInvocationException( + "The program plan could not be fetched - the program aborted pre-maturely.\n" + + "System.err: " + baes.toString() + '\n' + + "System.out: " + baos.toString() + '\n'); + } + // ------------------------------------------------------------------------ + + private void setAsContext() { + ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { + + @Override + public ExecutionEnvironment createExecutionEnvironment() { + return OptimizerPlanEnvironment.this; + } + }; + initializeContextEnvironment(factory); + } + + // ------------------------------------------------------------------------ + + public void setPlan(FlinkPlan plan){ + this.optimizerPlan = plan; + } + + /** + * A special exception used to abort programs when the caller is only interested in the + * program plan, rather than in the full execution. + */ + public static final class ProgramAbortException extends Error { + private static final long serialVersionUID = 1L; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 10096da..091a959 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -40,12 +40,9 @@ import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.Manifest; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Program; import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.dag.DataSinkNode; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; @@ -166,7 +163,7 @@ public class PackagedProgram { } } - public PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException { + PackagedProgram(Class<?> entryPointClass, String... args) throws ProgramInvocationException { this.jarFile = null; this.args = args == null ? new String[0] : args; @@ -685,51 +682,5 @@ public class PackagedProgram { throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t); } } - - // -------------------------------------------------------------------------------------------- - - public static final class PreviewPlanEnvironment extends ExecutionEnvironment { - - private List<DataSinkNode> previewPlan; - private Plan plan; - - private String preview = null; - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - this.plan = createProgramPlan(jobName); - this.previewPlan = Optimizer.createPreOptimizedPlan((Plan) plan); - - // do not go on with anything now! - throw new Client.ProgramAbortException(); - } - @Override - public String getExecutionPlan() throws Exception { - Plan plan = createProgramPlan("unused"); - this.previewPlan = Optimizer.createPreOptimizedPlan(plan); - - // do not go on with anything now! - throw new Client.ProgramAbortException(); - } - - public void setAsContext() { - ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { - @Override - public ExecutionEnvironment createExecutionEnvironment() { - return PreviewPlanEnvironment.this; - } - }; - initializeContextEnvironment(factory); - } - - public Plan getPlan() { - return this.plan; - } - - public void setPreview(String preview) { - this.preview = preview; - } - - } }