[FLINK-1631] [client] Overhaul of the client. - Fix bugs with non-serializable messages - Separate parser and action logic - Clean up tests - Vastly improve logging in CLI client - Additional tests for parsing / config setup in the command line client
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5385e48d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5385e48d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5385e48d Branch: refs/heads/master Commit: 5385e48d94a2df81c8fd6102a889cf42dd93fe2f Parents: 0333109 Author: Stephan Ewen <[email protected]> Authored: Tue Mar 3 21:49:37 2015 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Mar 4 18:20:36 2015 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 1211 ++++++++---------- .../apache/flink/client/cli/CancelOptions.java | 37 + .../flink/client/cli/CliArgsException.java | 30 + .../flink/client/cli/CliFrontendParser.java | 284 ++++ .../flink/client/cli/CommandLineOptions.java | 57 + .../apache/flink/client/cli/InfoOptions.java | 30 + .../apache/flink/client/cli/ListOptions.java | 46 + .../apache/flink/client/cli/ProgramOptions.java | 97 ++ .../org/apache/flink/client/cli/RunOptions.java | 30 + .../org/apache/flink/client/program/Client.java | 19 +- .../CliFrontendAddressConfigurationTest.java | 180 +++ .../flink/client/CliFrontendInfoTest.java | 39 +- .../CliFrontendJobManagerConnectionTest.java | 166 --- .../flink/client/CliFrontendListCancelTest.java | 61 +- .../client/CliFrontendPackageProgramTest.java | 223 ++-- .../apache/flink/client/CliFrontendRunTest.java | 14 +- .../flink/client/CliFrontendTestUtils.java | 15 +- .../ExecutionPlanAfterExecutionTest.java | 7 + .../main/flink-bin/conf/log4j-cli.properties | 1 + .../flink/runtime/client/JobStatusMessage.java | 59 + .../flink/runtime/jobmanager/JobManager.scala | 29 +- .../jobmanager/JobManagerCLIConfiguration.scala | 2 +- .../runtime/messages/JobManagerMessages.scala | 33 +- .../org/apache/flink/yarn/YarnTestBase.java | 9 +- .../apache/flink/yarn/ApplicationMaster.scala | 12 +- 25 files changed, 1627 insertions(+), 1064 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index e438de0..1d9d956 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -24,6 +24,8 @@ import java.io.FileNotFoundException; import java.io.FileInputStream; import java.io.InputStream; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -38,19 +40,20 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.pattern.Patterns; import akka.util.Timeout; + import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.MissingOptionException; -import org.apache.commons.cli.MissingArgumentException; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.cli.UnrecognizedOptionException; +import org.apache.flink.client.cli.CancelOptions; +import org.apache.flink.client.cli.CliArgsException; +import org.apache.flink.client.cli.CliFrontendParser; + import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.client.cli.InfoOptions; +import org.apache.flink.client.cli.ListOptions; +import org.apache.flink.client.cli.ProgramOptions; +import org.apache.flink.client.cli.RunOptions; import org.apache.flink.client.program.Client; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; @@ -58,18 +61,23 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob; -import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs; +import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; @@ -80,201 +88,133 @@ import scala.concurrent.duration.FiniteDuration; */ public class CliFrontend { - // run job by deploying Flink into a YARN cluster, if this string is specified as the jobmanager address - public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster"; - - // command line interface of the YARN session, with a special initialization here to prefix all options with y/yarn. - private static FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn"); - - //actions + // actions private static final String ACTION_RUN = "run"; private static final String ACTION_INFO = "info"; private static final String ACTION_LIST = "list"; private static final String ACTION_CANCEL = "cancel"; - // general options - private static final Option HELP_OPTION = new Option("h", "help", false, "Show the help message for the CLI Frontend or the action."); - - // program (jar file) specific options - private static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); - private static final Option CLASS_OPTION = new Option("c", "class", true, "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the JAR file does not specify the class in its manifest."); - private static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, "The parallelism with which to run the program. Optional flag to override the default value specified in the configuration."); - private static final Option ARGS_OPTION = new Option("a", "arguments", true, "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); - - private static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, "Address of the JobManager (master) to which to connect. Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a different JobManager than the one specified in the configuration."); - - // info specific options - - // list specific options - private static final Option RUNNING_OPTION = new Option("r", "running", false, "Show only running programs and their JobIDs"); - private static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, "Show only scheduled prorgrams and their JobIDs"); - - // canceling options - - static { - initOptions(); - } - - // action options all include the general options - private static final Options RUN_OPTIONS = getRunOptions(createGeneralOptions()); - private static final Options INFO_OPTIONS = getInfoOptions(createGeneralOptions()); - private static final Options LIST_OPTIONS = getListOptions(createGeneralOptions()); - private static final Options CANCEL_OPTIONS = getCancelOptions(createGeneralOptions()); - // config dir parameters private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR"; private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf"; private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf"; - /** - * YARN-session related constants - */ + // YARN-session related constants public static final String YARN_PROPERTIES_FILE = ".yarn-properties"; public static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager"; public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism"; public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; - // this has to be a regex for String.split() - public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; - - private CommandLineParser parser; - - private boolean printHelp; - - private boolean globalConfigurationLoaded; + public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // this has to be a regex for String.split() - private boolean yarnPropertiesLoaded = false; + /** + * A special host name used to run a job by deploying Flink into a YARN cluster, + * if this string is specified as the JobManager address + */ + public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster"; - private Properties yarnProperties; - // this flag indicates if the given Job is executed using a YARN cluster, - // started for this purpose. - private boolean runInYarnCluster = false; + // -------------------------------------------------------------------------------------------- + // -------------------------------------------------------------------------------------------- - private AbstractFlinkYarnCluster yarnCluster = null; + private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); - protected String configurationDirectory = null; + private final File configDirectory; + private final Configuration config; - /** - * Initializes the class - */ - public CliFrontend() { - parser = new PosixParser(); - } - - // -------------------------------------------------------------------------------------------- - // Setup of options - // -------------------------------------------------------------------------------------------- + private final FiniteDuration askTimeout; + + private final FiniteDuration lookupTimeout; + + private InetSocketAddress jobManagerAddress; + + private ActorSystem actorSystem; + + private AbstractFlinkYarnCluster yarnCluster; - private static void initOptions() { - HELP_OPTION.setRequired(false); - JAR_OPTION.setRequired(false); - JAR_OPTION.setArgName("jarfile"); - - CLASS_OPTION.setRequired(false); - CLASS_OPTION.setArgName("classname"); - - ADDRESS_OPTION.setRequired(false); - ADDRESS_OPTION.setArgName("host:port"); - - PARALLELISM_OPTION.setRequired(false); - PARALLELISM_OPTION.setArgName("parallelism"); - - ARGS_OPTION.setRequired(false); - ARGS_OPTION.setArgName("programArgs"); - ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES); - RUNNING_OPTION.setRequired(false); - SCHEDULED_OPTION.setRequired(false); - } - - static Options createGeneralOptions() { - Options options = new Options(); - options.addOption(HELP_OPTION); - // backwards compatibility: ignore verbose flag (-v) - options.addOption(new Option("v", "verbose", false, "This option is deprecated.")); - return options; - } - - // gets the program options with the old flags for jar file and arguments - static Options getProgramSpecificOptions(Options options) { - options.addOption(JAR_OPTION); - options.addOption(CLASS_OPTION); - options.addOption(PARALLELISM_OPTION); - options.addOption(ARGS_OPTION); - - // also add the YARN options so that the parser can parse them - yarnSessionCLi.getYARNSessionCLIOptions(options); - return options; - } - - // gets the program options without the old flags for jar file and arguments - static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) { - options.addOption(CLASS_OPTION); - options.addOption(PARALLELISM_OPTION); - return options; - } - - /** - * Builds command line options for the run action. - * - * @return Command line options for the run action. - */ - static Options getRunOptions(Options options) { - Options o = getProgramSpecificOptions(options); - return getJobManagerAddressOption(o); - } - - static Options getRunOptionsWithoutDeprecatedOptions(Options options) { - Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); - return getJobManagerAddressOption(o); - } - - static Options getJobManagerAddressOption(Options options) { - options.addOption(ADDRESS_OPTION); - return options; - } - - /** - * Builds command line options for the info action. - * - * @return Command line options for the info action. - */ - static Options getInfoOptions(Options options) { - options = getProgramSpecificOptions(options); - options = getJobManagerAddressOption(options); - return options; - } - - static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { - options = getProgramSpecificOptionsWithoutDeprecatedOptions(options); - options = getJobManagerAddressOption(options); - return options; - } - /** - * Builds command line options for the list action. - * - * @return Command line options for the list action. + * + * @throws Exception Thrown if teh configuration directory was not found, the configuration could not + * be loaded, or the YARN properties could not be parsed. */ - static Options getListOptions(Options options) { - options.addOption(RUNNING_OPTION); - options.addOption(SCHEDULED_OPTION); - options = getJobManagerAddressOption(options); - return options; + public CliFrontend() throws Exception { + this(getConfigurationDirectoryFromEnv()); } - - /** - * Builds command line options for the cancel action. - * - * @return Command line options for the cancel action. - */ - static Options getCancelOptions(Options options) { - options = getJobManagerAddressOption(options); - return options; + + public CliFrontend(String configDir) throws Exception { + + // configure the config directory + this.configDirectory = new File(configDir); + LOG.info("Using configuration directory " + this.configDirectory.getAbsolutePath()); + + // load the configuration + LOG.info("Trying to load configuration file"); + GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath()); + this.config = GlobalConfiguration.getConfiguration(); + + // load the YARN properties + File propertiesFile = new File(configDirectory, YARN_PROPERTIES_FILE); + if (propertiesFile.exists()) { + + logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); + + Properties yarnProperties = new Properties(); + try { + InputStream is = new FileInputStream(propertiesFile); + try { + yarnProperties.load(is); + } + finally { + is.close(); + } + } + catch (IOException e) { + throw new Exception("Cannot read the YARN properties file", e); + } + + // configure the default degree of parallelism from YARN + String propDegree = yarnProperties.getProperty(YARN_PROPERTIES_DOP); + if (propDegree != null) { // maybe the property is not set + try { + int paraDegree = Integer.parseInt(propDegree); + this.config.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree); + + logAndSysout("YARN properties set default parallelism to " + paraDegree); + } + catch (NumberFormatException e) { + throw new Exception("Error while parsing the YARN properties: " + + "Property " + YARN_PROPERTIES_DOP + " is not an integer."); + } + } + + // get the JobManager address from the YARN properties + String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); + if (address != null) { + try { + jobManagerAddress = parseJobManagerAddress(address); + } + catch (Exception e) { + throw new Exception("YARN properties contain an invalid entry for JobManager address.", e); + } + + logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); + } + + // handle the YARN client's dynamic properties + String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); + List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Tuple2<String, String> dynamicProperty : dynamicProperties) { + this.config.setString(dynamicProperty.f0, dynamicProperty.f1); + } + } + + this.askTimeout = AkkaUtils.getTimeout(config); + this.lookupTimeout = AkkaUtils.getLookupTimeout(config); } + // -------------------------------------------------------------------------------------------- // Execute Actions @@ -286,182 +226,137 @@ public class CliFrontend { * @param args Command line arguments for the run action. */ protected int run(String[] args) { - // Parse command line options - CommandLine line; + LOG.info("Running 'run' command."); + + RunOptions options; try { - line = parser.parse(RUN_OPTIONS, args, true); - evaluateGeneralOptions(line); + options = CliFrontendParser.parseRunCommand(args); } - catch (MissingOptionException e) { + catch (CliArgsException e) { return handleArgException(e); } - catch (MissingArgumentException e) { - return handleArgException(e); - } - catch (UnrecognizedOptionException e) { - return handleArgException(e); - } - catch (Exception e) { - return handleError(e); + catch (Throwable t) { + return handleError(t); } - - // ------------ check for help first -------------- - - if (printHelp) { - printHelpForRun(); + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForRun(); return 0; } + if (options.getJarFilePath() == null) { + return handleArgException(new CliArgsException("The program JAR file was not specified.")); + } + PackagedProgram program; - Client client; try { - program = buildProgram(line); - client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName()); - } catch (FileNotFoundException e) { + LOG.info("Building program from JAR file"); + program = buildProgram(options); + } + catch (FileNotFoundException e) { return handleArgException(e); - } catch (ProgramInvocationException e) { + } + catch (ProgramInvocationException e) { return handleError(e); - } catch (Throwable t) { + } + catch (Throwable t) { return handleError(t); } - int parallelism = -1; - if (line.hasOption(PARALLELISM_OPTION.getOpt())) { - String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt()); - try { - parallelism = Integer.parseInt(parString); - } catch (NumberFormatException e) { - System.out.println("The value " + parString + " is invalid for the degree of parallelism."); - return 1; - } + try { + Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName()); - if (parallelism <= 0) { - System.out.println("Invalid value for the degree-of-parallelism. Parallelism must be greater than zero."); - return 1; - } - } + int parallelism = options.getParallelism(); + int exitCode = executeProgram(program, client, parallelism); - int exitCode = executeProgram(program, client, parallelism); + if (yarnCluster != null) { + List<String> msgs = yarnCluster.getNewMessages(); + if (msgs != null && msgs.size() > 1) { - if(runInYarnCluster) { - List<String> msgs = yarnCluster.getNewMessages(); - if(msgs != null && msgs.size() > 1) { - System.out.println("The following messages were created by the YARN cluster while running the Job:"); - for(String msg : msgs) { - System.out.println(msg); + logAndSysout("The following messages were created by the YARN cluster while running the Job:"); + for (String msg : msgs) { + logAndSysout(msg); + } + } + if (yarnCluster.hasFailed()) { + logAndSysout("YARN cluster is in failed state!"); + logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics()); } } - if(yarnCluster.hasFailed()) { - System.out.println("YARN cluster is in failed state!"); - System.out.println("YARN Diagnostics: " + yarnCluster.getDiagnostics()); - } - System.out.println("Shutting down YARN cluster"); - yarnCluster.shutdown(); - } - - return exitCode; - } - // -------------------------------------------------------------------------------------------- - - protected int executeProgram(PackagedProgram program, Client client, int parallelism) { - JobExecutionResult execResult; - try { - client.setPrintStatusDuringExecution(true); - execResult = client.run(program, parallelism, true); + return exitCode; } - catch (ProgramInvocationException e) { - return handleError(e); + catch (Throwable t) { + return handleError(t); } finally { - program.deleteExtractedLibraries(); - } - - // we come here after the job has finished - if (execResult != null) { - System.out.println("Job Runtime: " + execResult.getNetRuntime()); - Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); - if (accumulatorsResult.size() > 0) { - System.out.println("Accumulator Results: "); - System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + if (yarnCluster != null) { + logAndSysout("Shutting down YARN cluster"); + yarnCluster.shutdown(); + } + if (program != null) { + program.deleteExtractedLibraries(); } } - return 0; } - - // -------------------------------------------------------------------------------------------- - + /** * Executes the info action. * * @param args Command line arguments for the info action. */ protected int info(String[] args) { + LOG.info("Running 'info' command."); + // Parse command line options - CommandLine line; + InfoOptions options; try { - line = parser.parse(INFO_OPTIONS, args, false); - evaluateGeneralOptions(line); - } - catch (MissingOptionException e) { - return handleArgException(e); + options = CliFrontendParser.parseInfoCommand(args); } - catch (MissingArgumentException e) { + catch (CliArgsException e) { return handleArgException(e); } - catch (UnrecognizedOptionException e) { - return handleArgException(e); - } - catch (Exception e) { - return handleError(e); + catch (Throwable t) { + return handleError(t); } - if (printHelp) { - printHelpForInfo(); + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForInfo(); return 0; } + if (options.getJarFilePath() == null) { + return handleArgException(new CliArgsException("The program JAR file was not specified.")); + } + // -------- build the packaged program ------------- PackagedProgram program; try { - program = buildProgram(line); - } catch (FileNotFoundException e) { - return handleError(e); - } catch (ProgramInvocationException e) { - return handleError(e); - } catch (Throwable t) { - return handleError(t); + LOG.info("Building program from JAR file"); + program = buildProgram(options); } - - int parallelism = -1; - if (line.hasOption(PARALLELISM_OPTION.getOpt())) { - String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt()); - try { - parallelism = Integer.parseInt(parString); - } catch (NumberFormatException e) { - System.out.println("The value " + parString + " is invalid for the degree of parallelism."); - return 1; - } - - if (parallelism <= 0) { - System.out.println("Invalid value for the degree-of-parallelism. Parallelism must be greater than zero."); - return 1; - } + catch (Throwable t) { + return handleError(t); } try { - Client client = getClient(line, program.getUserCodeClassLoader(), program.getMainClassName()); + int parallelism = options.getParallelism(); + + LOG.info("Creating program plan dump"); + Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName()); String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism); if (jsonPlan != null) { System.out.println("----------------------- Execution Plan -----------------------"); System.out.println(jsonPlan); System.out.println("--------------------------------------------------------------"); - } else { - System.out.println("JSON plan could not be compiled."); } - + else { + System.out.println("JSON plan could not be generated."); + } return 0; } catch (Throwable t) { @@ -478,33 +373,27 @@ public class CliFrontend { * @param args Command line arguments for the list action. */ protected int list(String[] args) { - // Parse command line options - CommandLine line; + LOG.info("Running 'list' command."); + + ListOptions options; try { - line = parser.parse(LIST_OPTIONS, args, false); - evaluateGeneralOptions(line); - } - catch (MissingOptionException e) { - return handleArgException(e); + options = CliFrontendParser.parseListCommand(args); } - catch (MissingArgumentException e) { + catch (CliArgsException e) { return handleArgException(e); } - catch (UnrecognizedOptionException e) { - return handleArgException(e); - } - catch (Exception e) { - return handleError(e); + catch (Throwable t) { + return handleError(t); } - - if (printHelp) { - printHelpForList(); + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForList(); return 0; } - - // get list options - boolean running = line.hasOption(RUNNING_OPTION.getOpt()); - boolean scheduled = line.hasOption(SCHEDULED_OPTION.getOpt()); + + boolean running = options.getRunning(); + boolean scheduled = options.getScheduled(); // print running and scheduled jobs if not option supplied if (!running && !scheduled) { @@ -513,87 +402,87 @@ public class CliFrontend { } try { - ActorRef jobManager = getJobManager(line, getGlobalConfiguration()); - if (jobManager == null) { - return 1; - } + ActorRef jobManager = getJobManager(options); - final Future<Object> response = Patterns.ask(jobManager, - JobManagerMessages.getRequestRunningJobs(), new Timeout(getAkkaTimeout())); + LOG.info("Connecting to JobManager to retrieve list of jobs"); + Future<Object> response = Patterns.ask(jobManager, + JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout)); Object result; try { - result = Await.result(response, getAkkaTimeout()); - } catch (Exception exception) { - throw new IOException("Could not retrieve running jobs from job manager.", - exception); + result = Await.result(response, askTimeout); + } + catch (Exception e) { + throw new Exception("Could not retrieve running jobs from the JobManager.", e); } - if (!(result instanceof RunningJobs)) { - throw new RuntimeException("ReqeustRunningJobs requires a response of type " + - "RunningJobs. Instead the response is of type " + result.getClass() + "."); - } else { - Iterable<ExecutionGraph> jobs = ((RunningJobs) result).asJavaIterable(); + if (result instanceof RunningJobsStatus) { + LOG.info("Successfully retrieved list of jobs"); + + List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages(); - ArrayList<ExecutionGraph> runningJobs = null; - ArrayList<ExecutionGraph> scheduledJobs = null; + ArrayList<JobStatusMessage> runningJobs = null; + ArrayList<JobStatusMessage> scheduledJobs = null; if (running) { - runningJobs = new ArrayList<ExecutionGraph>(); + runningJobs = new ArrayList<JobStatusMessage>(); } if (scheduled) { - scheduledJobs = new ArrayList<ExecutionGraph>(); + scheduledJobs = new ArrayList<JobStatusMessage>(); } - for (ExecutionGraph rj : jobs) { - - if (running && rj.getState().equals(JobStatus.RUNNING)) { + for (JobStatusMessage rj : jobs) { + if (running && rj.getJobState().equals(JobStatus.RUNNING)) { runningJobs.add(rj); } - if (scheduled && rj.getState().equals(JobStatus.CREATED)) { + if (scheduled && rj.getJobState().equals(JobStatus.CREATED)) { scheduledJobs.add(rj); } } SimpleDateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss"); - Comparator<ExecutionGraph> njec = new Comparator<ExecutionGraph>(){ - + Comparator<JobStatusMessage> njec = new Comparator<JobStatusMessage>(){ @Override - public int compare(ExecutionGraph o1, ExecutionGraph o2) { - return (int)(o1.getStatusTimestamp(o1.getState())-o2.getStatusTimestamp(o2 - .getState())); + public int compare(JobStatusMessage o1, JobStatusMessage o2) { + return (int)(o1.getStartTime()-o2.getStartTime()); } }; if (running) { if(runningJobs.size() == 0) { System.out.println("No running jobs."); - } else { + } + else { Collections.sort(runningJobs, njec); System.out.println("------------------------ Running Jobs ------------------------"); - for(ExecutionGraph rj : runningJobs) { - System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState()))) - +" : "+rj.getJobID().toString()+" : "+rj.getJobName()); + for (JobStatusMessage rj : runningJobs) { + System.out.println(df.format(new Date(rj.getStartTime())) + + " : " + rj.getJobId() + " : " + rj.getJobName()); } System.out.println("--------------------------------------------------------------"); } } if (scheduled) { - if(scheduledJobs.size() == 0) { + if (scheduledJobs.size() == 0) { System.out.println("No scheduled jobs."); - } else { + } + else { Collections.sort(scheduledJobs, njec); System.out.println("----------------------- Scheduled Jobs -----------------------"); - for(ExecutionGraph rj : scheduledJobs) { - System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState()))) - +" : "+rj.getJobID().toString()+" : "+rj.getJobName()); + for(JobStatusMessage rj : scheduledJobs) { + System.out.println(df.format(new Date(rj.getStartTime())) + + " : " + rj.getJobId() + " : " + rj.getJobName()); } System.out.println("--------------------------------------------------------------"); } } return 0; } + else { + throw new Exception("ReqeustRunningJobs requires a response of type " + + "RunningJobs. Instead the response is of type " + result.getClass() + "."); + } } catch (Throwable t) { return handleError(t); @@ -601,452 +490,299 @@ public class CliFrontend { } /** - * Executes the cancel action. + * Executes the CANCEL action. * * @param args Command line arguments for the cancel action. */ protected int cancel(String[] args) { - // Parse command line options - CommandLine line; + LOG.info("Running 'cancel' command."); + + CancelOptions options; try { - line = parser.parse(CANCEL_OPTIONS, args, false); - evaluateGeneralOptions(line); - } - catch (MissingOptionException e) { - return handleArgException(e); + options = CliFrontendParser.parseCancelCommand(args); } - catch (MissingArgumentException e) { + catch (CliArgsException e) { return handleArgException(e); } - catch (UnrecognizedOptionException e) { - return handleArgException(e); - } - catch (Exception e) { - return handleError(e); + catch (Throwable t) { + return handleError(t); } - - if (printHelp) { - printHelpForCancel(); + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForCancel(); return 0; } - String[] cleanedArgs = line.getArgs(); + String[] cleanedArgs = options.getArgs(); JobID jobId; if (cleanedArgs.length > 0) { String jobIdString = cleanedArgs[0]; try { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { + } + catch (Exception e) { + LOG.error("Error: The value for the Job ID is not a valid ID."); System.out.println("Error: The value for the Job ID is not a valid ID."); return 1; } - } else { + } + else { + LOG.error("Missing JobID in the command line arguments."); System.out.println("Error: Specify a Job ID to cancel a job."); return 1; } try { - ActorRef jobManager = getJobManager(line, getGlobalConfiguration()); - - if (jobManager == null) { - return 1; - } - - final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), - new Timeout(getAkkaTimeout())); + ActorRef jobManager = getJobManager(options); + Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout)); try { - Await.ready(response, getAkkaTimeout()); - } catch (Exception exception) { - throw new IOException("Canceling the job with job ID " + jobId + " failed.", - exception); + Await.result(response, askTimeout); + return 0; + } + catch (Exception e) { + throw new Exception("Canceling the job with ID " + jobId + " failed.", e); } - - return 0; } catch (Throwable t) { return handleError(t); } } + // -------------------------------------------------------------------------------------------- + // Interaction with programs and JobManager + // -------------------------------------------------------------------------------------------- + + protected int executeProgram(PackagedProgram program, Client client, int parallelism) { + LOG.info("Starting execution or program"); + JobExecutionResult execResult; + try { + client.setPrintStatusDuringExecution(true); + execResult = client.run(program, parallelism, true); + } + catch (ProgramInvocationException e) { + return handleError(e); + } + finally { + program.deleteExtractedLibraries(); + } + + LOG.info("Program execution finished"); + + // we come here after the job has finished + if (execResult != null) { + System.out.println("Job Runtime: " + execResult.getNetRuntime()); + Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults(); + if (accumulatorsResult.size() > 0) { + System.out.println("Accumulator Results: "); + System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult)); + } + } + return 0; + } + /** - * @param line - * + * Creates a Packaged program from the given command line options. + * * @return A PackagedProgram (upon success) * @throws java.io.FileNotFoundException, org.apache.flink.client.program.ProgramInvocationException, java.lang.Throwable */ - protected PackagedProgram buildProgram(CommandLine line) throws FileNotFoundException, ProgramInvocationException { - String[] programArgs = line.hasOption(ARGS_OPTION.getOpt()) ? - line.getOptionValues(ARGS_OPTION.getOpt()) : - line.getArgs(); - - // take the jar file from the option, or as the first trailing parameter (if available) - String jarFilePath; - if (line.hasOption(JAR_OPTION.getOpt())) { - jarFilePath = line.getOptionValue(JAR_OPTION.getOpt()); - } - else if (programArgs.length > 0) { - jarFilePath = programArgs[0]; - programArgs = Arrays.copyOfRange(programArgs, 1, programArgs.length); - } - else { - throw new FileNotFoundException("Error: Jar file was not specified."); + protected PackagedProgram buildProgram(ProgramOptions options) + throws FileNotFoundException, ProgramInvocationException + { + String[] programArgs = options.getProgramArgs(); + String jarFilePath = options.getJarFilePath(); + + if (jarFilePath == null) { + throw new IllegalArgumentException("The program JAR file was not specified."); } - + File jarFile = new File(jarFilePath); // Check if JAR file exists if (!jarFile.exists()) { - throw new FileNotFoundException("Error: Jar file does not exist: " + jarFile); + throw new FileNotFoundException("JAR file does not exist: " + jarFile); } else if (!jarFile.isFile()) { - throw new FileNotFoundException("Error: Jar file is not a file: " + jarFile); + throw new FileNotFoundException("JAR file is not a file: " + jarFile); } // Get assembler class - String entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ? - line.getOptionValue(CLASS_OPTION.getOpt()) : - null; + String entryPointClass = options.getEntryPointClassName(); return entryPointClass == null ? new PackagedProgram(jarFile, programArgs) : new PackagedProgram(jarFile, entryPointClass, programArgs); } - - protected String getJobManagerAddressString(CommandLine line) throws IOException { - Configuration configuration = getGlobalConfiguration(); - - // first, check if the address comes from the command line option - if (line.hasOption(ADDRESS_OPTION.getOpt())) { + + + protected InetSocketAddress getJobManagerAddress(CommandLineOptions options) throws Exception { + + // first, check if the address is specified as an option + if (options.getJobManagerAddress() != null) { + return parseJobManagerAddress(options.getJobManagerAddress()); + } + + // second, check whether the address was already parsed, or configured through the YARN properties + if (jobManagerAddress == null) { + // config file must have the address + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + + // verify that there is a jobmanager address and port in the configuration + if (jobManagerHost == null) { + throw new Exception("Found no configuration in the config directory '" + configDirectory + + "' that specifies the JobManager address."); + } + + int jobManagerPort; try { - return line.getOptionValue(ADDRESS_OPTION.getOpt()); + jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); } - catch (Exception e) { - System.out.println("Error: The JobManager address has an invalid format. " + e.getMessage()); - return null; + catch (NumberFormatException e) { + throw new Exception("Invalid value for the JobManager port (" + + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + ") in the configuration."); } - } - else { - Properties yarnProps = getYarnProperties(); - if(yarnProps != null) { - try { - String address = yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); - System.out.println("Found a yarn properties file (" + YARN_PROPERTIES_FILE + ") file, " - + "using \""+address+"\" to connect to the JobManager"); - return address; - } catch (Exception e) { - System.out.println("Found a yarn properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager address from the file. " - + e.getMessage()); - return null; - } - } else { - // regular config file gives the address - String jobManagerAddress = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - - // verify that there is a jobmanager address and port in the configuration - if (jobManagerAddress == null) { - System.out.println("Error: Found no configuration in the config directory '" + - getConfigurationDirectory() + "' that specifies the JobManager address."); - return null; - } - - int jobManagerPort; - try { - jobManagerPort = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - } catch (NumberFormatException e) { - System.out.println("Invalid value for the JobManager IPC port (" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + - ") in the configuration."); - return null; - } - - if (jobManagerPort == -1) { - System.out.println("Error: Found no configuration in the config directory '" + - getConfigurationDirectory() + "' that specifies the JobManager port."); - return null; - } - - return jobManagerAddress + ":" + jobManagerPort; + + if (jobManagerPort == -1) { + throw new Exception("Found no configuration in the config directory '" + configDirectory + + "' that specifies the JobManager port."); } - } - } - - protected ActorRef getJobManager(CommandLine line, Configuration config) throws IOException { - //TODO: Get ActorRef from YarnCluster if we are in YARN mode. - String jobManagerAddressStr = getJobManagerAddressString(line); - if (jobManagerAddressStr == null) { - return null; - } - final ActorSystem actorSystem; - try { - scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0); - actorSystem = AkkaUtils.createActorSystem(config, new Some<scala.Tuple2<String, Object>>(systemEndpoint)); - } - catch (Exception e) { - throw new IOException("Could not start actor system to communicate with JobManager", e); + jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort); } - try { - InetSocketAddress address = RemoteExecutor.getInetFromHostport(jobManagerAddressStr); - return JobManager.getJobManagerRemoteReference(address, actorSystem, config); - } - finally { - actorSystem.shutdown(); - } + return jobManagerAddress; } + protected ActorRef getJobManager(CommandLineOptions options) throws Exception { + //TODO: Get ActorRef from YarnCluster if we are in YARN mode. - public String getConfigurationDirectory() { - if (configurationDirectory == null) { - configurationDirectory = getConfigurationDirectoryFromEnv(); - } - return configurationDirectory; - } + InetSocketAddress address = getJobManagerAddress(options); - /** - * Reads configuration settings. The default path can be overridden - * by setting the ENV variable "FLINK_CONF_DIR". - * - * @return Flink's global configuration - */ - protected Configuration getGlobalConfiguration() { - if (!globalConfigurationLoaded) { - String location = getConfigurationDirectory(); - GlobalConfiguration.loadConfiguration(location); - // set default parallelization degree - Properties yarnProps; + // start an actor system if needed + if (this.actorSystem == null) { + LOG.info("Starting actor system to communicate with JobManager"); try { - yarnProps = getYarnProperties(); - if(yarnProps != null) { - String propDegree = yarnProps.getProperty(YARN_PROPERTIES_DOP); - int paraDegree = -1; - if(propDegree != null) { // maybe the property is not set - paraDegree = Integer.valueOf(propDegree); - } - Configuration c = GlobalConfiguration.getConfiguration(); - if(paraDegree != -1) { - c.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree); - } - // handle the YARN client's dynamic properties - String dynamicPropertiesEncoded = yarnProps.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); - List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); - for(Tuple2<String, String> dynamicProperty : dynamicProperties) { - c.setString(dynamicProperty.f0, dynamicProperty.f1); - } - GlobalConfiguration.includeConfiguration(c); // update config - } - } catch (IOException e) { - e.printStackTrace(); - System.err.println("Error while loading YARN properties: " + e.getMessage()); + scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0); + this.actorSystem = AkkaUtils.createActorSystem(config, + new Some<scala.Tuple2<String, Object>>(systemEndpoint)); + } + catch (Exception e) { + throw new IOException("Could not start actor system to communicate with JobManager", e); } - globalConfigurationLoaded = true; + LOG.info("Actor system successfully started"); } - return GlobalConfiguration.getConfiguration(); - } - public static String getConfigurationDirectoryFromEnv() { - String location; - if (System.getenv(ENV_CONFIG_DIRECTORY) != null) { - location = System.getenv(ENV_CONFIG_DIRECTORY); - } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) { - location = CONFIG_DIRECTORY_FALLBACK_1; - } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { - location = CONFIG_DIRECTORY_FALLBACK_2; - } else { - throw new RuntimeException("The configuration directory was not found. Please configure the '" + - ENV_CONFIG_DIRECTORY + "' environment variable properly."); - } - return location; - } - protected FiniteDuration getAkkaTimeout(){ - Configuration config = getGlobalConfiguration(); - return AkkaUtils.getTimeout(config); + LOG.info("Trying to lookup JobManager"); + ActorRef jmActor = JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout); + LOG.info("JobManager is at " + jmActor.path()); + return jmActor; } - public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { - List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); - if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { - String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); - for(String propLine : propertyLines) { - if(propLine == null) { - continue; - } - String[] kv = propLine.split("="); - if(kv != null && kv[0] != null && kv[1] != null && kv[0].length() > 0) { - ret.add(new Tuple2<String, String>(kv[0], kv[1])); - } - } - } - return ret; - } - - protected Properties getYarnProperties() throws IOException { - if(!yarnPropertiesLoaded) { - String loc = getConfigurationDirectory(); - File propertiesFile = new File(loc + '/' + YARN_PROPERTIES_FILE); - if (propertiesFile.exists()) { - Properties props = new Properties(); - InputStream is = new FileInputStream( propertiesFile ); - props.load(is); - yarnProperties = props; - is.close(); - } else { - yarnProperties = null; - } - yarnPropertiesLoaded = true; - } - return yarnProperties; - } + - protected Client getClient(CommandLine line, ClassLoader classLoader, String programName) throws IOException { - String jmAddrString = getJobManagerAddressString(line); - InetSocketAddress jobManagerAddress = null; - if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) { - System.out.println("YARN cluster mode detected. Switching Log4j output to console"); + protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName) throws Exception { + + InetSocketAddress jobManagerAddress; + + if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { + logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - this.runInYarnCluster = true; // user wants to run Flink in YARN cluster. - AbstractFlinkYarnClient flinkYarnClient = yarnSessionCLi.createFlinkYarnClient(line); - if(flinkYarnClient == null) { + CommandLine commandLine = options.getCommandLine(); + AbstractFlinkYarnClient flinkYarnClient = + CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine); + + if (flinkYarnClient == null) { throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); } try { - yarnCluster = flinkYarnClient.deploy("Flink Application: "+programName); - } catch(Exception e) { + yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName); + } + catch(Exception e) { throw new RuntimeException("Error deploying the YARN cluster", e); } + jobManagerAddress = yarnCluster.getJobManagerAddress(); - System.out.println("YARN cluster started"); - System.out.println("JobManager web interface address "+yarnCluster.getWebInterfaceURL()); - System.out.println("Waiting until all TaskManagers have connected"); + + logAndSysout("YARN cluster started"); + logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL()); + logAndSysout("Waiting until all TaskManagers have connected"); + while(true) { FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); - if(status != null) { + if (status != null) { if (status.getNumberOfTaskManagers() < flinkYarnClient.getTaskManagerCount()) { - System.out.println("TaskManager status (" + status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")"); + logAndSysout("TaskManager status (" + status.getNumberOfTaskManagers() + "/" + flinkYarnClient.getTaskManagerCount() + ")"); } else { - System.out.println("Enough TaskManagers are connected"); + logAndSysout("All TaskManagers are connected"); break; } } else { - System.out.println("No status updates from YARN cluster received so far. Waiting ..."); + logAndSysout("No status updates from YARN cluster received so far. Waiting ..."); } + try { Thread.sleep(500); - } catch (InterruptedException e) { - System.err.println("Thread as interrupted"); Thread.currentThread().interrupt(); + } + catch (InterruptedException e) { + LOG.error("Interrupted while waiting for TaskManagers"); + System.err.println("Thread is interrupted"); + Thread.currentThread().interrupt(); } } - } else { - jobManagerAddress = RemoteExecutor.getInetFromHostport(jmAddrString); } - return new Client(jobManagerAddress, getGlobalConfiguration(), classLoader); + else { + jobManagerAddress = getJobManagerAddress(options); + } + return new Client(jobManagerAddress, config, classLoader); } + // -------------------------------------------------------------------------------------------- + // Logging and Exception Handling + // -------------------------------------------------------------------------------------------- + /** - * Prints the help for the client. + * Displays an exception message for incorrect command line arguments. + * + * @param e The exception to display. + * @return The return code for the process. */ - private void printHelp() { - System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]"); - System.out.println(); - System.out.println("The following actions are available:"); - - /* The only general option is -h and the help messages are always printed on errors. - HelpFormatter formatter = new HelpFormatter(); - formatter.setWidth(80); - formatter.setLeftPadding(5); - formatter.setSyntaxPrefix(" general options:"); - formatter.printHelp(" ", GENERAL_OPTIONS); - */ - - printHelpForRun(); - printHelpForInfo(); - printHelpForList(); - printHelpForCancel(); - - System.out.println(); - } - - private void printHelpForRun() { - HelpFormatter formatter = new HelpFormatter(); - formatter.setLeftPadding(5); - formatter.setWidth(80); - - System.out.println("\nAction \"run\" compiles and runs a program."); - System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>"); - formatter.setSyntaxPrefix(" \"run\" action options:"); - formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); - formatter.setSyntaxPrefix(" Additional arguments if -m "+YARN_DEPLOY_JOBMANAGER+" is set:"); - Options yarnOpts = new Options(); - yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); - formatter.printHelp(" ", yarnOpts); - System.out.println(); - } - - private void printHelpForInfo() { - HelpFormatter formatter = new HelpFormatter(); - formatter.setLeftPadding(5); - formatter.setWidth(80); - - System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON)."); - System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>"); - formatter.setSyntaxPrefix(" \"info\" action options:"); - formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options())); - System.out.println(); - } - - private void printHelpForList() { - HelpFormatter formatter = new HelpFormatter(); - formatter.setLeftPadding(5); - formatter.setWidth(80); - - System.out.println("\nAction \"list\" lists running and scheduled programs."); - System.out.println("\n Syntax: list [OPTIONS]"); - formatter.setSyntaxPrefix(" \"list\" action options:"); - formatter.printHelp(" ", getListOptions(new Options())); - System.out.println(); - } - - private void printHelpForCancel() { - HelpFormatter formatter = new HelpFormatter(); - formatter.setLeftPadding(5); - formatter.setWidth(80); - - System.out.println("\nAction \"cancel\" cancels a running program."); - System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>"); - formatter.setSyntaxPrefix(" \"cancel\" action options:"); - formatter.printHelp(" ", getCancelOptions(new Options())); - System.out.println(); - } - private int handleArgException(Exception e) { + LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage())); + System.out.println(e.getMessage()); System.out.println(); - System.out.println("Specify the help option (-h or --help) to get help on the command."); + System.out.println("Use the help option (-h or --help) to get help on the command."); return 1; } /** - * Displays exceptions. + * Displays an exception message. * * @param t The exception to display. + * @return The return code for the process. */ private int handleError(Throwable t) { + LOG.error("Error while running the command.", t); + t.printStackTrace(); System.err.println(); System.err.println("The exception above occurred while trying to run your command."); return 1; } + private void logAndSysout(String message) { + LOG.info(message); + System.out.println(message); + } - + // -------------------------------------------------------------------------------------------- + // Entry point for executable + // -------------------------------------------------------------------------------------------- - private void evaluateGeneralOptions(CommandLine line) { - // check help flag - this.printHelp = line.hasOption(HELP_OPTION.getOpt()); - } - /** * Parses the command line arguments and starts the requested action. * @@ -1057,7 +793,7 @@ public class CliFrontend { // check for action if (args.length < 1) { - printHelp(); + CliFrontendParser.printHelp(); System.out.println("Please specify an action."); return 1; } @@ -1071,8 +807,11 @@ public class CliFrontend { // do action if (action.equals(ACTION_RUN)) { // run() needs to run in a secured environment for the optimizer. - if(SecurityUtils.isSecurityEnabled()) { - System.out.println("Secure Hadoop setup detected."); + if (SecurityUtils.isSecurityEnabled()) { + String message = "Secure Hadoop environment setup detected. Running in secure context."; + LOG.info(message); + System.out.println(message); + try { return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() { @Override @@ -1085,16 +824,21 @@ public class CliFrontend { } } return run(params); - } else if (action.equals(ACTION_LIST)) { + } + else if (action.equals(ACTION_LIST)) { return list(params); - } else if (action.equals(ACTION_INFO)) { + } + else if (action.equals(ACTION_INFO)) { return info(params); - } else if (action.equals(ACTION_CANCEL)) { + } + else if (action.equals(ACTION_CANCEL)) { return cancel(params); - } else if (action.equals("-h") || action.equals("--help")) { - printHelp(); + } + else if (action.equals("-h") || action.equals("--help")) { + CliFrontendParser.printHelp(); return 0; - } else { + } + else { System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println(); System.out.println("Valid actions are \"run\", \"list\", \"info\", or \"cancel\"."); @@ -1104,15 +848,92 @@ public class CliFrontend { } } + public void shutdown() { + ActorSystem sys = this.actorSystem; + if (sys != null) { + this.actorSystem = null; + sys.shutdown(); + } + } /** * Submits the job based on the arguments */ - public static void main(String[] args) throws ParseException { + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); + EnvironmentInformation.checkJavaVersion(); + + try { + CliFrontend cli = new CliFrontend(); + int retCode = cli.parseParameters(args); + System.exit(retCode); + } + catch (Throwable t) { + LOG.error("Fatal error while running command line interface.", t); + t.printStackTrace(); + System.exit(31); + } + } + + // -------------------------------------------------------------------------------------------- + // Miscellaneous Utilities + // -------------------------------------------------------------------------------------------- + + private static InetSocketAddress parseJobManagerAddress(String hostAndPort) { + URI uri; + try { + uri = new URI("my://" + hostAndPort); + } catch (URISyntaxException e) { + throw new RuntimeException("Malformed address " + hostAndPort, e); + } + String host = uri.getHost(); + int port = uri.getPort(); + if (host == null || port == -1) { + throw new RuntimeException("Address is missing hostname or port " + hostAndPort); + } + return new InetSocketAddress(host, port); + } - CliFrontend cli = new CliFrontend(); - int retCode = cli.parseParameters(args); - System.exit(retCode); + public static String getConfigurationDirectoryFromEnv() { + String location = System.getenv(ENV_CONFIG_DIRECTORY); + + if (location != null) { + if (new File(location).exists()) { + return location; + } + else { + throw new RuntimeException("The config directory '" + location + "', specified in the '" + + ENV_CONFIG_DIRECTORY + "' environment variable, does not exist."); + } + } + else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) { + location = CONFIG_DIRECTORY_FALLBACK_1; + } + else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { + location = CONFIG_DIRECTORY_FALLBACK_2; + } + else { + throw new RuntimeException("The configuration directory was not specified. " + + "Please specify the directory containing the configuration file through the '" + + ENV_CONFIG_DIRECTORY + "' environment variable."); + } + return location; } + public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { + List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); + if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { + String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); + for(String propLine : propertyLines) { + if(propLine == null) { + continue; + } + String[] kv = propLine.split("="); + if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { + ret.add(new Tuple2<String, String>(kv[0], kv[1])); + } + } + } + return ret; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java new file mode 100644 index 0000000..22e9ece --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +/** + * Command line options for the CANCEL command + */ +public class CancelOptions extends CommandLineOptions { + + private final String[] args; + + public CancelOptions(CommandLine line) { + super(line); + this.args = line.getArgs(); + } + + public String[] getArgs() { + return args == null ? new String[0] : args; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java new file mode 100644 index 0000000..932c66d --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +/** + * Special exception that is thrown when the command line parsing fails. + */ +public class CliArgsException extends Exception { + + private static final long serialVersionUID = 1L; + + public CliArgsException(String message) { + super(message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java new file mode 100644 index 0000000..0f6ad24 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.client.FlinkYarnSessionCli; + +/** + * A simple command line parser (based on Apache Commons CLI) that extracts command + * line options. + */ +public class CliFrontendParser { + + /** command line interface of the YARN session, with a special initialization here + * to prefix all options with y/yarn. */ + private static final FlinkYarnSessionCli yarnSessionCLi = new FlinkYarnSessionCli("y", "yarn"); + + + static final Option HELP_OPTION = new Option("h", "help", false, + "Show the help message for the CLI Frontend or the action."); + + static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file."); + + static final Option CLASS_OPTION = new Option("c", "class", true, + "Class with the program entry point (\"main\" method or \"getPlan()\" method. Only needed if the " + + "JAR file does not specify the class in its manifest."); + + static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true, + "The parallelism with which to run the program. Optional flag to override the default value " + + "specified in the configuration."); + + static final Option ARGS_OPTION = new Option("a", "arguments", true, + "Program arguments. Arguments can also be added without -a, simply as trailing parameters."); + + static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true, + "Address of the JobManager (master) to which to connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER + + "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " + + "different JobManager than the one specified in the configuration."); + + // list specific options + static final Option RUNNING_OPTION = new Option("r", "running", false, + "Show only running programs and their JobIDs"); + + static final Option SCHEDULED_OPTION = new Option("s", "scheduled", false, + "Show only scheduled programs and their JobIDs"); + + static { + HELP_OPTION.setRequired(false); + + JAR_OPTION.setRequired(false); + JAR_OPTION.setArgName("jarfile"); + + CLASS_OPTION.setRequired(false); + CLASS_OPTION.setArgName("classname"); + + ADDRESS_OPTION.setRequired(false); + ADDRESS_OPTION.setArgName("host:port"); + + PARALLELISM_OPTION.setRequired(false); + PARALLELISM_OPTION.setArgName("parallelism"); + + ARGS_OPTION.setRequired(false); + ARGS_OPTION.setArgName("programArgs"); + ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES); + + RUNNING_OPTION.setRequired(false); + SCHEDULED_OPTION.setRequired(false); + } + + private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options())); + private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options())); + private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options())); + private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options())); + + + private static Options buildGeneralOptions(Options options) { + options.addOption(HELP_OPTION); + // backwards compatibility: ignore verbose flag (-v) + options.addOption(new Option("v", "verbose", false, "This option is deprecated.")); + return options; + } + + public static Options getProgramSpecificOptions(Options options) { + options.addOption(JAR_OPTION); + options.addOption(CLASS_OPTION); + options.addOption(PARALLELISM_OPTION); + options.addOption(ARGS_OPTION); + + // also add the YARN options so that the parser can parse them + yarnSessionCLi.getYARNSessionCLIOptions(options); + return options; + } + + private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) { + options.addOption(CLASS_OPTION); + options.addOption(PARALLELISM_OPTION); + return options; + } + + private static Options getRunOptions(Options options) { + Options o = getProgramSpecificOptions(options); + return getJobManagerAddressOption(o); + } + + private static Options getRunOptionsWithoutDeprecatedOptions(Options options) { + Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); + return getJobManagerAddressOption(o); + } + + private static Options getJobManagerAddressOption(Options options) { + options.addOption(ADDRESS_OPTION); + return options; + } + + private static Options getInfoOptions(Options options) { + options = getProgramSpecificOptions(options); + options = getJobManagerAddressOption(options); + return options; + } + + private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { + options = getProgramSpecificOptionsWithoutDeprecatedOptions(options); + options = getJobManagerAddressOption(options); + return options; + } + + private static Options getListOptions(Options options) { + options.addOption(RUNNING_OPTION); + options.addOption(SCHEDULED_OPTION); + options = getJobManagerAddressOption(options); + return options; + } + + private static Options getCancelOptions(Options options) { + options = getJobManagerAddressOption(options); + return options; + } + + // -------------------------------------------------------------------------------------------- + // Help + // -------------------------------------------------------------------------------------------- + + /** + * Prints the help for the client. + */ + public static void printHelp() { + System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]"); + System.out.println(); + System.out.println("The following actions are available:"); + + printHelpForRun(); + printHelpForInfo(); + printHelpForList(); + printHelpForCancel(); + + System.out.println(); + } + + public static void printHelpForRun() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"run\" compiles and runs a program."); + System.out.println("\n Syntax: run [OPTIONS] <jar-file> <arguments>"); + formatter.setSyntaxPrefix(" \"run\" action options:"); + formatter.printHelp(" ", getRunOptionsWithoutDeprecatedOptions(new Options())); + formatter.setSyntaxPrefix(" Additional arguments if -m " + CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:"); + Options yarnOpts = new Options(); + yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts); + formatter.printHelp(" ", yarnOpts); + System.out.println(); + } + + public static void printHelpForInfo() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"info\" shows the optimized execution plan of the program (JSON)."); + System.out.println("\n Syntax: info [OPTIONS] <jar-file> <arguments>"); + formatter.setSyntaxPrefix(" \"info\" action options:"); + formatter.printHelp(" ", getInfoOptionsWithoutDeprecatedOptions(new Options())); + System.out.println(); + } + + public static void printHelpForList() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"list\" lists running and scheduled programs."); + System.out.println("\n Syntax: list [OPTIONS]"); + formatter.setSyntaxPrefix(" \"list\" action options:"); + formatter.printHelp(" ", getListOptions(new Options())); + System.out.println(); + } + + public static void printHelpForCancel() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("\nAction \"cancel\" cancels a running program."); + System.out.println("\n Syntax: cancel [OPTIONS] <Job ID>"); + formatter.setSyntaxPrefix(" \"cancel\" action options:"); + formatter.printHelp(" ", getCancelOptions(new Options())); + System.out.println(); + } + + // -------------------------------------------------------------------------------------------- + // Line Parsing + // -------------------------------------------------------------------------------------------- + + public static RunOptions parseRunCommand(String[] args) throws CliArgsException { + try { + PosixParser parser = new PosixParser(); + CommandLine line = parser.parse(RUN_OPTIONS, args, true); + return new RunOptions(line); + } + catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + + public static ListOptions parseListCommand(String[] args) throws CliArgsException { + try { + PosixParser parser = new PosixParser(); + CommandLine line = parser.parse(LIST_OPTIONS, args, false); + return new ListOptions(line); + } + catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + + public static CancelOptions parseCancelCommand(String[] args) throws CliArgsException { + try { + PosixParser parser = new PosixParser(); + CommandLine line = parser.parse(CANCEL_OPTIONS, args, false); + return new CancelOptions(line); + } + catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + + public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException { + try { + PosixParser parser = new PosixParser(); + CommandLine line = parser.parse(INFO_OPTIONS, args, false); + return new InfoOptions(line); + } + catch (ParseException e) { + throw new CliArgsException(e.getMessage()); + } + } + + public static FlinkYarnSessionCli getFlinkYarnSessionCli() { + return yarnSessionCLi; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java new file mode 100644 index 0000000..f6f6319 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION; + +/** + * Base class for all options parsed from the command line. + * Contains options for printing help and the JobManager address. + */ +public abstract class CommandLineOptions { + + private final CommandLine commandLine; + + private final String jobManagerAddress; + + private final boolean printHelp; + + + protected CommandLineOptions(CommandLine line) { + this.commandLine = line; + this.printHelp = line.hasOption(HELP_OPTION.getOpt()); + this.jobManagerAddress = line.hasOption(ADDRESS_OPTION.getOpt()) ? + line.getOptionValue(ADDRESS_OPTION.getOpt()) : null; + } + + public CommandLine getCommandLine() { + return commandLine; + } + + public boolean isPrintHelp() { + return printHelp; + } + + public String getJobManagerAddress() { + return jobManagerAddress; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java new file mode 100644 index 0000000..83f5c38 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +/** + * Command line options for the INFO command + */ +public class InfoOptions extends ProgramOptions { + + public InfoOptions(CommandLine line) throws CliArgsException { + super(line); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java new file mode 100644 index 0000000..45f39a4 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION; + +/** + * Command line options for the LIST command + */ +public class ListOptions extends CommandLineOptions { + + private final boolean running; + private final boolean scheduled; + + public ListOptions(CommandLine line) { + super(line); + this.running = line.hasOption(RUNNING_OPTION.getOpt()); + this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt()); + } + + public boolean getRunning() { + return running; + } + + public boolean getScheduled() { + return scheduled; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java new file mode 100644 index 0000000..5b24a41 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +import java.util.Arrays; + +import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION; + +/** + * Base class for command line options that refer to a JAR file program. + */ +public abstract class ProgramOptions extends CommandLineOptions { + + private final String jarFilePath; + + private final String entryPointClass; + + private final String[] programArgs; + + private final int parallelism; + + protected ProgramOptions(CommandLine line) throws CliArgsException { + super(line); + + String[] args = line.hasOption(ARGS_OPTION.getOpt()) ? + line.getOptionValues(ARGS_OPTION.getOpt()) : + line.getArgs(); + + if (line.hasOption(JAR_OPTION.getOpt())) { + this.jarFilePath = line.getOptionValue(JAR_OPTION.getOpt()); + } + else if (args.length > 0) { + jarFilePath = args[0]; + args = Arrays.copyOfRange(args, 1, args.length); + } + else { + jarFilePath = null; + } + + this.programArgs = args; + + this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ? + line.getOptionValue(CLASS_OPTION.getOpt()) : null; + + if (line.hasOption(PARALLELISM_OPTION.getOpt())) { + String parString = line.getOptionValue(PARALLELISM_OPTION.getOpt()); + try { + parallelism = Integer.parseInt(parString); + if (parallelism <= 0) { + throw new NumberFormatException(); + } + } + catch (NumberFormatException e) { + throw new CliArgsException("The parallelism must be a positive number: " + parString); + } + } + else { + parallelism = -1; + } + } + + public String getJarFilePath() { + return jarFilePath; + } + + public String getEntryPointClassName() { + return entryPointClass; + } + + public String[] getProgramArgs() { + return programArgs; + } + + public int getParallelism() { + return parallelism; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java new file mode 100644 index 0000000..2e4eb31 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; + +/** + * Command line options for the RUN command. + */ +public class RunOptions extends ProgramOptions { + + public RunOptions(CommandLine line) throws CliArgsException { + super(line); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 5a032a0..f4a2dc9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -25,7 +25,6 @@ import java.io.PrintStream; import java.net.InetSocketAddress; import java.util.List; -import akka.remote.AssociationErrorEvent; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; @@ -219,8 +218,7 @@ public class Client { } private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) { - JobGraph job = null; - + JobGraph job; if (optPlan instanceof StreamingPlan) { job = ((StreamingPlan) optPlan).getJobGraph(); } else { @@ -356,21 +354,6 @@ public class Client { return new JobExecutionResult(-1, null); } - private Throwable getAssociationError(List<AssociationErrorEvent> eventLog) { - int len = eventLog.size(); - if (len > 0) { - AssociationErrorEvent e = eventLog.get(len - 1); - Throwable cause = e.getCause(); - if (cause instanceof akka.remote.InvalidAssociation) { - return cause.getCause(); - } else { - return cause; - } - } else { - return null; - } - } - // -------------------------------------------------------------------------------------------- public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
