[FLINK-1631] [client] Overhaul of the client.

 - Fix bugs with non-serializable messages
 - Separate parser and action logic
 - Clean up tests
 - Vastly improve logging in CLI client
 - Additional tests for parsing / config setup in the command line client


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5385e48d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5385e48d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5385e48d

Branch: refs/heads/master
Commit: 5385e48d94a2df81c8fd6102a889cf42dd93fe2f
Parents: 0333109
Author: Stephan Ewen <[email protected]>
Authored: Tue Mar 3 21:49:37 2015 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Mar 4 18:20:36 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 1211 ++++++++----------
 .../apache/flink/client/cli/CancelOptions.java  |   37 +
 .../flink/client/cli/CliArgsException.java      |   30 +
 .../flink/client/cli/CliFrontendParser.java     |  284 ++++
 .../flink/client/cli/CommandLineOptions.java    |   57 +
 .../apache/flink/client/cli/InfoOptions.java    |   30 +
 .../apache/flink/client/cli/ListOptions.java    |   46 +
 .../apache/flink/client/cli/ProgramOptions.java |   97 ++
 .../org/apache/flink/client/cli/RunOptions.java |   30 +
 .../org/apache/flink/client/program/Client.java |   19 +-
 .../CliFrontendAddressConfigurationTest.java    |  180 +++
 .../flink/client/CliFrontendInfoTest.java       |   39 +-
 .../CliFrontendJobManagerConnectionTest.java    |  166 ---
 .../flink/client/CliFrontendListCancelTest.java |   61 +-
 .../client/CliFrontendPackageProgramTest.java   |  223 ++--
 .../apache/flink/client/CliFrontendRunTest.java |   14 +-
 .../flink/client/CliFrontendTestUtils.java      |   15 +-
 .../ExecutionPlanAfterExecutionTest.java        |    7 +
 .../main/flink-bin/conf/log4j-cli.properties    |    1 +
 .../flink/runtime/client/JobStatusMessage.java  |   59 +
 .../flink/runtime/jobmanager/JobManager.scala   |   29 +-
 .../jobmanager/JobManagerCLIConfiguration.scala |    2 +-
 .../runtime/messages/JobManagerMessages.scala   |   33 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |    9 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   12 +-
 25 files changed, 1627 insertions(+), 1064 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index e438de0..1d9d956 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -24,6 +24,8 @@ import java.io.FileNotFoundException;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,19 +40,20 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.MissingOptionException;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-import org.apache.commons.cli.UnrecognizedOptionException;
+import org.apache.flink.client.cli.CancelOptions;
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.InfoOptions;
+import org.apache.flink.client.cli.ListOptions;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -58,18 +61,23 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobs;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
 import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
 import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -80,201 +88,133 @@ import scala.concurrent.duration.FiniteDuration;
  */
 public class CliFrontend {
 
-       // run job by deploying Flink into a YARN cluster, if this string is 
specified as the jobmanager address
-       public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
-
-       // command line interface of the YARN session, with a special 
initialization here to prefix all options with y/yarn.
-       private static FlinkYarnSessionCli yarnSessionCLi = new 
FlinkYarnSessionCli("y", "yarn");
-
-       //actions
+       // actions
        private static final String ACTION_RUN = "run";
        private static final String ACTION_INFO = "info";
        private static final String ACTION_LIST = "list";
        private static final String ACTION_CANCEL = "cancel";
 
-       // general options
-       private static final Option HELP_OPTION = new Option("h", "help", 
false, "Show the help message for the CLI Frontend or the action.");
-
-       // program (jar file) specific options
-       private static final Option JAR_OPTION = new Option("j", "jarfile", 
true, "Flink program JAR file.");
-       private static final Option CLASS_OPTION = new Option("c", "class", 
true, "Class with the program entry point (\"main\" method or \"getPlan()\" 
method. Only needed if the JAR file does not specify the class in its 
manifest.");
-       private static final Option PARALLELISM_OPTION = new Option("p", 
"parallelism", true, "The parallelism with which to run the program. Optional 
flag to override the default value specified in the configuration.");
-       private static final Option ARGS_OPTION = new Option("a", "arguments", 
true, "Program arguments. Arguments can also be added without -a, simply as 
trailing parameters.");
-
-       private static final Option ADDRESS_OPTION = new Option("m", 
"jobmanager", true, "Address of the JobManager (master) to which to connect. 
Specify '"+YARN_DEPLOY_JOBMANAGER+"' as the JobManager to deploy a YARN cluster 
for the job. Use this flag to connect to a different JobManager than the one 
specified in the configuration.");
-
-       // info specific options
-
-       // list specific options
-       private static final Option RUNNING_OPTION = new Option("r", "running", 
false, "Show only running programs and their JobIDs");
-       private static final Option SCHEDULED_OPTION = new Option("s", 
"scheduled", false, "Show only scheduled prorgrams and their JobIDs");
-
-       // canceling options
-
-       static {
-               initOptions();
-       }
-       
-       // action options all include the general options
-       private static final Options RUN_OPTIONS = 
getRunOptions(createGeneralOptions());
-       private static final Options INFO_OPTIONS = 
getInfoOptions(createGeneralOptions());
-       private static final Options LIST_OPTIONS = 
getListOptions(createGeneralOptions());
-       private static final Options CANCEL_OPTIONS = 
getCancelOptions(createGeneralOptions());
-       
        // config dir parameters
        private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
        private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
        private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
        
-       /**
-        * YARN-session related constants
-        */
+       // YARN-session related constants
        public static final String YARN_PROPERTIES_FILE = ".yarn-properties";
        public static final String YARN_PROPERTIES_JOBMANAGER_KEY = 
"jobManager";
        public static final String YARN_PROPERTIES_DOP = "degreeOfParallelism";
        public static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = 
"dynamicPropertiesString";
-       // this has to be a regex for String.split()
-       public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@";
-       
 
-       private CommandLineParser parser;
-       
-       private boolean printHelp;
-       
-       private boolean globalConfigurationLoaded;
+       public static final String YARN_DYNAMIC_PROPERTIES_SEPARATOR = "@@"; // 
this has to be a regex for String.split()
 
-       private boolean yarnPropertiesLoaded = false;
+       /**
+        * A special host name used to run a job by deploying Flink into a YARN 
cluster,
+        * if this string is specified as the JobManager address
+        */
+       public static final String YARN_DEPLOY_JOBMANAGER = "yarn-cluster";
        
-       private Properties yarnProperties;
 
-       // this flag indicates if the given Job is executed using a YARN 
cluster,
-       // started for this purpose.
-       private boolean runInYarnCluster = false;
+       // 
--------------------------------------------------------------------------------------------
+       // 
--------------------------------------------------------------------------------------------
 
-       private AbstractFlinkYarnCluster yarnCluster = null;
+       private static final Logger LOG = 
LoggerFactory.getLogger(CliFrontend.class);
 
-       protected String configurationDirectory = null;
+       private final File configDirectory;
 
+       private final Configuration config;
 
-       /**
-        * Initializes the class
-        */
-       public CliFrontend() {
-               parser = new PosixParser();
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       //  Setup of options
-       // 
--------------------------------------------------------------------------------------------
+       private final FiniteDuration askTimeout;
+
+       private final FiniteDuration lookupTimeout;
+
+       private InetSocketAddress jobManagerAddress;
+
+       private ActorSystem actorSystem;
+
+       private AbstractFlinkYarnCluster yarnCluster;
 
-       private static void initOptions() {
-               HELP_OPTION.setRequired(false);
 
-               JAR_OPTION.setRequired(false);
-               JAR_OPTION.setArgName("jarfile");
-               
-               CLASS_OPTION.setRequired(false);
-               CLASS_OPTION.setArgName("classname");
-               
-               ADDRESS_OPTION.setRequired(false);
-               ADDRESS_OPTION.setArgName("host:port");
-               
-               PARALLELISM_OPTION.setRequired(false);
-               PARALLELISM_OPTION.setArgName("parallelism");
-               
-               ARGS_OPTION.setRequired(false);
-               ARGS_OPTION.setArgName("programArgs");
-               ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
 
-               RUNNING_OPTION.setRequired(false);
-               SCHEDULED_OPTION.setRequired(false);
-       }
-       
-       static Options createGeneralOptions() {
-               Options options = new Options();
-               options.addOption(HELP_OPTION);
-               // backwards compatibility: ignore verbose flag (-v)
-               options.addOption(new Option("v", "verbose", false, "This 
option is deprecated."));
-               return options;
-       }
-       
-       // gets the program options with the old flags for jar file and 
arguments
-       static Options getProgramSpecificOptions(Options options) {
-               options.addOption(JAR_OPTION);
-               options.addOption(CLASS_OPTION);
-               options.addOption(PARALLELISM_OPTION);
-               options.addOption(ARGS_OPTION);
-
-               // also add the YARN options so that the parser can parse them
-               yarnSessionCLi.getYARNSessionCLIOptions(options);
-               return options;
-       }
-       
-       // gets the program options without the old flags for jar file and 
arguments
-       static Options 
getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
-               options.addOption(CLASS_OPTION);
-               options.addOption(PARALLELISM_OPTION);
-               return options;
-       }
-       
-       /**
-        * Builds command line options for the run action.
-        * 
-        * @return Command line options for the run action.
-        */
-       static Options getRunOptions(Options options) {
-               Options o = getProgramSpecificOptions(options);
-               return getJobManagerAddressOption(o);
-       }
-       
-       static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
-               Options o = 
getProgramSpecificOptionsWithoutDeprecatedOptions(options);
-               return getJobManagerAddressOption(o);
-       }
-       
-       static Options getJobManagerAddressOption(Options options) {
-               options.addOption(ADDRESS_OPTION);
-               return options;
-       }
-       
-       /**
-        * Builds command line options for the info action.
-        * 
-        * @return Command line options for the info action.
-        */
-       static Options getInfoOptions(Options options) {
-               options = getProgramSpecificOptions(options);
-               options = getJobManagerAddressOption(options);
-               return options;
-       }
-       
-       static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
-               options = 
getProgramSpecificOptionsWithoutDeprecatedOptions(options);
-               options = getJobManagerAddressOption(options);
-               return options;
-       }
-       
        /**
-        * Builds command line options for the list action.
-        * 
-        * @return Command line options for the list action.
+        *
+        * @throws Exception Thrown if teh configuration directory was not 
found, the configuration could not
+        *                   be loaded, or the YARN properties could not be 
parsed.
         */
-       static Options getListOptions(Options options) {
-               options.addOption(RUNNING_OPTION);
-               options.addOption(SCHEDULED_OPTION);
-               options = getJobManagerAddressOption(options);
-               return options;
+       public CliFrontend() throws Exception {
+               this(getConfigurationDirectoryFromEnv());
        }
-       
-       /**
-        * Builds command line options for the cancel action.
-        * 
-        * @return Command line options for the cancel action.
-        */
-       static Options getCancelOptions(Options options) {
-               options = getJobManagerAddressOption(options);
-               return options;
+
+       public CliFrontend(String configDir) throws Exception {
+
+               // configure the config directory
+               this.configDirectory = new File(configDir);
+               LOG.info("Using configuration directory " + 
this.configDirectory.getAbsolutePath());
+
+               // load the configuration
+               LOG.info("Trying to load configuration file");
+               
GlobalConfiguration.loadConfiguration(this.configDirectory.getAbsolutePath());
+               this.config = GlobalConfiguration.getConfiguration();
+
+               // load the YARN properties
+               File propertiesFile = new File(configDirectory, 
YARN_PROPERTIES_FILE);
+               if (propertiesFile.exists()) {
+
+                       logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
+
+                       Properties yarnProperties = new Properties();
+                       try {
+                               InputStream is = new 
FileInputStream(propertiesFile);
+                               try {
+                                       yarnProperties.load(is);
+                               }
+                               finally {
+                                       is.close();
+                               }
+                       }
+                       catch (IOException e) {
+                               throw new Exception("Cannot read the YARN 
properties file", e);
+                       }
+
+                       // configure the default degree of parallelism from YARN
+                       String propDegree = 
yarnProperties.getProperty(YARN_PROPERTIES_DOP);
+                       if (propDegree != null) { // maybe the property is not 
set
+                               try {
+                                       int paraDegree = 
Integer.parseInt(propDegree);
+                                       
this.config.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, 
paraDegree);
+
+                                       logAndSysout("YARN properties set 
default parallelism to " + paraDegree);
+                               }
+                               catch (NumberFormatException e) {
+                                       throw new Exception("Error while 
parsing the YARN properties: " +
+                                                       "Property " + 
YARN_PROPERTIES_DOP + " is not an integer.");
+                               }
+                       }
+
+                       // get the JobManager address from the YARN properties
+                       String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+                       if (address != null) {
+                               try {
+                                       jobManagerAddress = 
parseJobManagerAddress(address);
+                               }
+                               catch (Exception e) {
+                                       throw new Exception("YARN properties 
contain an invalid entry for JobManager address.", e);
+                               }
+
+                               logAndSysout("Using JobManager address from 
YARN properties " + jobManagerAddress);
+                       }
+
+                       // handle the YARN client's dynamic properties
+                       String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+                       List<Tuple2<String, String>> dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
+                       for (Tuple2<String, String> dynamicProperty : 
dynamicProperties) {
+                               this.config.setString(dynamicProperty.f0, 
dynamicProperty.f1);
+                       }
+               }
+
+               this.askTimeout = AkkaUtils.getTimeout(config);
+               this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
        }
+
        
        // 
--------------------------------------------------------------------------------------------
        //  Execute Actions
@@ -286,182 +226,137 @@ public class CliFrontend {
         * @param args Command line arguments for the run action.
         */
        protected int run(String[] args) {
-               // Parse command line options
-               CommandLine line;
+               LOG.info("Running 'run' command.");
+
+               RunOptions options;
                try {
-                       line = parser.parse(RUN_OPTIONS, args, true);
-                       evaluateGeneralOptions(line);
+                       options = CliFrontendParser.parseRunCommand(args);
                }
-               catch (MissingOptionException e) {
+               catch (CliArgsException e) {
                        return handleArgException(e);
                }
-               catch (MissingArgumentException e) {
-                       return handleArgException(e);
-               }
-               catch (UnrecognizedOptionException e) {
-                       return handleArgException(e);
-               }
-               catch (Exception e) {
-                       return handleError(e);
+               catch (Throwable t) {
+                       return handleError(t);
                }
-               
-               // ------------ check for help first --------------
-               
-               if (printHelp) {
-                       printHelpForRun();
+
+               // evaluate help flag
+               if (options.isPrintHelp()) {
+                       CliFrontendParser.printHelpForRun();
                        return 0;
                }
 
+               if (options.getJarFilePath() == null) {
+                       return handleArgException(new CliArgsException("The 
program JAR file was not specified."));
+               }
+
                PackagedProgram program;
-               Client client;
                try {
-                       program = buildProgram(line);
-                       client = getClient(line, 
program.getUserCodeClassLoader(), program.getMainClassName());
-               } catch (FileNotFoundException e) {
+                       LOG.info("Building program from JAR file");
+                       program = buildProgram(options);
+               }
+               catch (FileNotFoundException e) {
                        return handleArgException(e);
-               } catch (ProgramInvocationException e) {
+               }
+               catch (ProgramInvocationException e) {
                        return handleError(e);
-               } catch (Throwable t) {
+               }
+               catch (Throwable t) {
                        return handleError(t);
                }
 
-               int parallelism = -1;
-               if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
-                       String parString = 
line.getOptionValue(PARALLELISM_OPTION.getOpt());
-                       try {
-                               parallelism = Integer.parseInt(parString);
-                       } catch (NumberFormatException e) {
-                               System.out.println("The value " + parString + " 
is invalid for the degree of parallelism.");
-                               return 1;
-                       }
+               try {
+                       Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName());
 
-                       if (parallelism <= 0) {
-                               System.out.println("Invalid value for the 
degree-of-parallelism. Parallelism must be greater than zero.");
-                               return 1;
-                       }
-               }
+                       int parallelism = options.getParallelism();
+                       int exitCode = executeProgram(program, client, 
parallelism);
 
-               int exitCode = executeProgram(program, client, parallelism);
+                       if (yarnCluster != null) {
+                               List<String> msgs = 
yarnCluster.getNewMessages();
+                               if (msgs != null && msgs.size() > 1) {
 
-               if(runInYarnCluster) {
-                       List<String> msgs = yarnCluster.getNewMessages();
-                       if(msgs != null && msgs.size() > 1) {
-                               System.out.println("The following messages were 
created by the YARN cluster while running the Job:");
-                               for(String msg : msgs) {
-                                       System.out.println(msg);
+                                       logAndSysout("The following messages 
were created by the YARN cluster while running the Job:");
+                                       for (String msg : msgs) {
+                                               logAndSysout(msg);
+                                       }
+                               }
+                               if (yarnCluster.hasFailed()) {
+                                       logAndSysout("YARN cluster is in failed 
state!");
+                                       logAndSysout("YARN Diagnostics: " + 
yarnCluster.getDiagnostics());
                                }
                        }
-                       if(yarnCluster.hasFailed()) {
-                               System.out.println("YARN cluster is in failed 
state!");
-                               System.out.println("YARN Diagnostics: " + 
yarnCluster.getDiagnostics());
-                       }
-                       System.out.println("Shutting down YARN cluster");
-                       yarnCluster.shutdown();
-               }
-
-               return exitCode;
-       }
 
-       // 
--------------------------------------------------------------------------------------------
-
-       protected int executeProgram(PackagedProgram program, Client client, 
int parallelism) {
-               JobExecutionResult execResult;
-               try {
-                       client.setPrintStatusDuringExecution(true);
-                       execResult = client.run(program, parallelism, true);
+                       return exitCode;
                }
-               catch (ProgramInvocationException e) {
-                       return handleError(e);
+               catch (Throwable t) {
+                       return handleError(t);
                }
                finally {
-                       program.deleteExtractedLibraries();
-               }
-               
-               // we come here after the job has finished
-               if (execResult != null) {
-                       System.out.println("Job Runtime: " + 
execResult.getNetRuntime());
-                       Map<String, Object> accumulatorsResult = 
execResult.getAllAccumulatorResults();
-                       if (accumulatorsResult.size() > 0) {
-                               System.out.println("Accumulator Results: ");
-                               
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+                       if (yarnCluster != null) {
+                               logAndSysout("Shutting down YARN cluster");
+                               yarnCluster.shutdown();
+                       }
+                       if (program != null) {
+                               program.deleteExtractedLibraries();
                        }
                }
-               return 0;
        }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
+
        /**
         * Executes the info action.
         * 
         * @param args Command line arguments for the info action. 
         */
        protected int info(String[] args) {
+               LOG.info("Running 'info' command.");
+
                // Parse command line options
-               CommandLine line;
+               InfoOptions options;
                try {
-                       line = parser.parse(INFO_OPTIONS, args, false);
-                       evaluateGeneralOptions(line);
-               }
-               catch (MissingOptionException e) {
-                       return handleArgException(e);
+                       options = CliFrontendParser.parseInfoCommand(args);
                }
-               catch (MissingArgumentException e) {
+               catch (CliArgsException e) {
                        return handleArgException(e);
                }
-               catch (UnrecognizedOptionException e) {
-                       return handleArgException(e);
-               }
-               catch (Exception e) {
-                       return handleError(e);
+               catch (Throwable t) {
+                       return handleError(t);
                }
 
-               if (printHelp) {
-                       printHelpForInfo();
+               // evaluate help flag
+               if (options.isPrintHelp()) {
+                       CliFrontendParser.printHelpForInfo();
                        return 0;
                }
 
+               if (options.getJarFilePath() == null) {
+                       return handleArgException(new CliArgsException("The 
program JAR file was not specified."));
+               }
+
                // -------- build the packaged program -------------
                
                PackagedProgram program;
                try {
-                       program = buildProgram(line);
-               } catch (FileNotFoundException e) {
-                       return handleError(e);
-               } catch (ProgramInvocationException e) {
-                       return handleError(e);
-               } catch (Throwable t) {
-                       return handleError(t);
+                       LOG.info("Building program from JAR file");
+                       program = buildProgram(options);
                }
-               
-               int parallelism = -1;
-               if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
-                       String parString = 
line.getOptionValue(PARALLELISM_OPTION.getOpt());
-                       try {
-                               parallelism = Integer.parseInt(parString);
-                       } catch (NumberFormatException e) {
-                               System.out.println("The value " + parString + " 
is invalid for the degree of parallelism.");
-                               return 1;
-                       }
-                       
-                       if (parallelism <= 0) {
-                               System.out.println("Invalid value for the 
degree-of-parallelism. Parallelism must be greater than zero.");
-                               return 1;
-                       }
+               catch (Throwable t) {
+                       return handleError(t);
                }
                
                try {
-                       Client client = getClient(line, 
program.getUserCodeClassLoader(), program.getMainClassName());
+                       int parallelism = options.getParallelism();
+
+                       LOG.info("Creating program plan dump");
+                       Client client = getClient(options, 
program.getUserCodeClassLoader(), program.getMainClassName());
                        String jsonPlan = 
client.getOptimizedPlanAsJson(program, parallelism);
 
                        if (jsonPlan != null) {
                                System.out.println("----------------------- 
Execution Plan -----------------------");
                                System.out.println(jsonPlan);
                                
System.out.println("--------------------------------------------------------------");
-                       } else {
-                               System.out.println("JSON plan could not be 
compiled.");
                        }
-                       
+                       else {
+                               System.out.println("JSON plan could not be 
generated.");
+                       }
                        return 0;
                }
                catch (Throwable t) {
@@ -478,33 +373,27 @@ public class CliFrontend {
         * @param args Command line arguments for the list action.
         */
        protected int list(String[] args) {
-               // Parse command line options
-               CommandLine line;
+               LOG.info("Running 'list' command.");
+
+               ListOptions options;
                try {
-                       line = parser.parse(LIST_OPTIONS, args, false);
-                       evaluateGeneralOptions(line);
-               }
-               catch (MissingOptionException e) {
-                       return handleArgException(e);
+                       options = CliFrontendParser.parseListCommand(args);
                }
-               catch (MissingArgumentException e) {
+               catch (CliArgsException e) {
                        return handleArgException(e);
                }
-               catch (UnrecognizedOptionException e) {
-                       return handleArgException(e);
-               }
-               catch (Exception e) {
-                       return handleError(e);
+               catch (Throwable t) {
+                       return handleError(t);
                }
-               
-               if (printHelp) {
-                       printHelpForList();
+
+               // evaluate help flag
+               if (options.isPrintHelp()) {
+                       CliFrontendParser.printHelpForList();
                        return 0;
                }
-               
-               // get list options
-               boolean running = line.hasOption(RUNNING_OPTION.getOpt());
-               boolean scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+
+               boolean running = options.getRunning();
+               boolean scheduled = options.getScheduled();
                
                // print running and scheduled jobs if not option supplied
                if (!running && !scheduled) {
@@ -513,87 +402,87 @@ public class CliFrontend {
                }
                
                try {
-                       ActorRef jobManager = getJobManager(line, 
getGlobalConfiguration());
-                       if (jobManager == null) {
-                               return 1;
-                       }
+                       ActorRef jobManager = getJobManager(options);
 
-                       final Future<Object> response = Patterns.ask(jobManager,
-                                       
JobManagerMessages.getRequestRunningJobs(), new Timeout(getAkkaTimeout()));
+                       LOG.info("Connecting to JobManager to retrieve list of 
jobs");
+                       Future<Object> response = Patterns.ask(jobManager,
+                                       
JobManagerMessages.getRequestRunningJobsStatus(), new Timeout(askTimeout));
 
                        Object result;
                        try {
-                               result = Await.result(response, 
getAkkaTimeout());
-                       } catch (Exception exception) {
-                               throw new IOException("Could not retrieve 
running jobs from job manager.",
-                                               exception);
+                               result = Await.result(response, askTimeout);
+                       }
+                       catch (Exception e) {
+                               throw new Exception("Could not retrieve running 
jobs from the JobManager.", e);
                        }
 
-                       if (!(result instanceof RunningJobs)) {
-                               throw new RuntimeException("ReqeustRunningJobs 
requires a response of type " +
-                                               "RunningJobs. Instead the 
response is of type " + result.getClass() + ".");
-                       } else {
-                               Iterable<ExecutionGraph> jobs = ((RunningJobs) 
result).asJavaIterable();
+                       if (result instanceof RunningJobsStatus) {
+                               LOG.info("Successfully retrieved list of jobs");
+
+                               List<JobStatusMessage> jobs = 
((RunningJobsStatus) result).getStatusMessages();
 
-                               ArrayList<ExecutionGraph> runningJobs = null;
-                               ArrayList<ExecutionGraph> scheduledJobs = null;
+                               ArrayList<JobStatusMessage> runningJobs = null;
+                               ArrayList<JobStatusMessage> scheduledJobs = 
null;
                                if (running) {
-                                       runningJobs = new 
ArrayList<ExecutionGraph>();
+                                       runningJobs = new 
ArrayList<JobStatusMessage>();
                                }
                                if (scheduled) {
-                                       scheduledJobs = new 
ArrayList<ExecutionGraph>();
+                                       scheduledJobs = new 
ArrayList<JobStatusMessage>();
                                }
 
-                               for (ExecutionGraph rj : jobs) {
-
-                                       if (running && 
rj.getState().equals(JobStatus.RUNNING)) {
+                               for (JobStatusMessage rj : jobs) {
+                                       if (running && 
rj.getJobState().equals(JobStatus.RUNNING)) {
                                                runningJobs.add(rj);
                                        }
-                                       if (scheduled && 
rj.getState().equals(JobStatus.CREATED)) {
+                                       if (scheduled && 
rj.getJobState().equals(JobStatus.CREATED)) {
                                                scheduledJobs.add(rj);
                                        }
                                }
 
                                SimpleDateFormat df = new 
SimpleDateFormat("dd.MM.yyyy HH:mm:ss");
-                               Comparator<ExecutionGraph> njec = new 
Comparator<ExecutionGraph>(){
-
+                               Comparator<JobStatusMessage> njec = new 
Comparator<JobStatusMessage>(){
                                        @Override
-                                       public int compare(ExecutionGraph o1, 
ExecutionGraph o2) {
-                                               return 
(int)(o1.getStatusTimestamp(o1.getState())-o2.getStatusTimestamp(o2
-                                                               .getState()));
+                                       public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
+                                               return 
(int)(o1.getStartTime()-o2.getStartTime());
                                        }
                                };
 
                                if (running) {
                                        if(runningJobs.size() == 0) {
                                                System.out.println("No running 
jobs.");
-                                       } else {
+                                       }
+                                       else {
                                                Collections.sort(runningJobs, 
njec);
 
                                                
System.out.println("------------------------ Running Jobs 
------------------------");
-                                               for(ExecutionGraph rj : 
runningJobs) {
-                                                       
System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
-                                                                       +" : 
"+rj.getJobID().toString()+" : "+rj.getJobName());
+                                               for (JobStatusMessage rj : 
runningJobs) {
+                                                       
System.out.println(df.format(new Date(rj.getStartTime()))
+                                                                       + " : " 
+ rj.getJobId() + " : " + rj.getJobName());
                                                }
                                                
System.out.println("--------------------------------------------------------------");
                                        }
                                }
                                if (scheduled) {
-                                       if(scheduledJobs.size() == 0) {
+                                       if (scheduledJobs.size() == 0) {
                                                System.out.println("No 
scheduled jobs.");
-                                       } else {
+                                       }
+                                       else {
                                                Collections.sort(scheduledJobs, 
njec);
 
                                                
System.out.println("----------------------- Scheduled Jobs 
-----------------------");
-                                               for(ExecutionGraph rj : 
scheduledJobs) {
-                                                       
System.out.println(df.format(new Date(rj.getStatusTimestamp(rj.getState())))
-                                                                       +" : 
"+rj.getJobID().toString()+" : "+rj.getJobName());
+                                               for(JobStatusMessage rj : 
scheduledJobs) {
+                                                       
System.out.println(df.format(new Date(rj.getStartTime()))
+                                                                       + " : " 
+ rj.getJobId() + " : " + rj.getJobName());
                                                }
                                                
System.out.println("--------------------------------------------------------------");
                                        }
                                }
                                return 0;
                        }
+                       else {
+                               throw new Exception("ReqeustRunningJobs 
requires a response of type " +
+                                               "RunningJobs. Instead the 
response is of type " + result.getClass() + ".");
+                       }
                }
                catch (Throwable t) {
                        return handleError(t);
@@ -601,452 +490,299 @@ public class CliFrontend {
        }
        
        /**
-        * Executes the cancel action.
+        * Executes the CANCEL action.
         * 
         * @param args Command line arguments for the cancel action.
         */
        protected int cancel(String[] args) {
-               // Parse command line options
-               CommandLine line;
+               LOG.info("Running 'cancel' command.");
+
+               CancelOptions options;
                try {
-                       line = parser.parse(CANCEL_OPTIONS, args, false);
-                       evaluateGeneralOptions(line);
-               }
-               catch (MissingOptionException e) {
-                       return handleArgException(e);
+                       options = CliFrontendParser.parseCancelCommand(args);
                }
-               catch (MissingArgumentException e) {
+               catch (CliArgsException e) {
                        return handleArgException(e);
                }
-               catch (UnrecognizedOptionException e) {
-                       return handleArgException(e);
-               }
-               catch (Exception e) {
-                       return handleError(e);
+               catch (Throwable t) {
+                       return handleError(t);
                }
-               
-               if (printHelp) {
-                       printHelpForCancel();
+
+               // evaluate help flag
+               if (options.isPrintHelp()) {
+                       CliFrontendParser.printHelpForCancel();
                        return 0;
                }
                
-               String[] cleanedArgs = line.getArgs();
+               String[] cleanedArgs = options.getArgs();
                JobID jobId;
 
                if (cleanedArgs.length > 0) {
                        String jobIdString = cleanedArgs[0];
                        try {
                                jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
-                       } catch (Exception e) {
+                       }
+                       catch (Exception e) {
+                               LOG.error("Error: The value for the Job ID is 
not a valid ID.");
                                System.out.println("Error: The value for the 
Job ID is not a valid ID.");
                                return 1;
                        }
-               } else {
+               }
+               else {
+                       LOG.error("Missing JobID in the command line 
arguments.");
                        System.out.println("Error: Specify a Job ID to cancel a 
job.");
                        return 1;
                }
                
                try {
-                       ActorRef jobManager = getJobManager(line, 
getGlobalConfiguration());
-
-                       if (jobManager == null) {
-                               return 1;
-                       }
-
-                       final Future<Object> response = 
Patterns.ask(jobManager, new CancelJob(jobId),
-                                       new Timeout(getAkkaTimeout()));
+                       ActorRef jobManager = getJobManager(options);
+                       Future<Object> response = Patterns.ask(jobManager, new 
CancelJob(jobId), new Timeout(askTimeout));
 
                        try {
-                               Await.ready(response, getAkkaTimeout());
-                       } catch (Exception exception) {
-                               throw new IOException("Canceling the job with 
job ID " + jobId + " failed.",
-                                               exception);
+                               Await.result(response, askTimeout);
+                               return 0;
+                       }
+                       catch (Exception e) {
+                               throw new Exception("Canceling the job with ID 
" + jobId + " failed.", e);
                        }
-
-                       return 0;
                }
                catch (Throwable t) {
                        return handleError(t);
                }
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Interaction with programs and JobManager
+       // 
--------------------------------------------------------------------------------------------
+
+       protected int executeProgram(PackagedProgram program, Client client, 
int parallelism) {
+               LOG.info("Starting execution or program");
+               JobExecutionResult execResult;
+               try {
+                       client.setPrintStatusDuringExecution(true);
+                       execResult = client.run(program, parallelism, true);
+               }
+               catch (ProgramInvocationException e) {
+                       return handleError(e);
+               }
+               finally {
+                       program.deleteExtractedLibraries();
+               }
+
+               LOG.info("Program execution finished");
+
+               // we come here after the job has finished
+               if (execResult != null) {
+                       System.out.println("Job Runtime: " + 
execResult.getNetRuntime());
+                       Map<String, Object> accumulatorsResult = 
execResult.getAllAccumulatorResults();
+                       if (accumulatorsResult.size() > 0) {
+                               System.out.println("Accumulator Results: ");
+                               
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
+                       }
+               }
+               return 0;
+       }
+
        /**
-        * @param line
-        * 
+        * Creates a Packaged program from the given command line options.
+        *
         * @return A PackagedProgram (upon success)
         * @throws java.io.FileNotFoundException, 
org.apache.flink.client.program.ProgramInvocationException, java.lang.Throwable
         */
-       protected PackagedProgram buildProgram(CommandLine line) throws 
FileNotFoundException, ProgramInvocationException {
-               String[] programArgs = line.hasOption(ARGS_OPTION.getOpt()) ?
-                               line.getOptionValues(ARGS_OPTION.getOpt()) :
-                               line.getArgs();
-       
-               // take the jar file from the option, or as the first trailing 
parameter (if available)
-               String jarFilePath;
-               if (line.hasOption(JAR_OPTION.getOpt())) {
-                       jarFilePath = line.getOptionValue(JAR_OPTION.getOpt());
-               }
-               else if (programArgs.length > 0) {
-                       jarFilePath = programArgs[0];
-                       programArgs = Arrays.copyOfRange(programArgs, 1, 
programArgs.length);
-               }
-               else {
-                       throw new FileNotFoundException("Error: Jar file was 
not specified.");
+       protected PackagedProgram buildProgram(ProgramOptions options)
+                       throws FileNotFoundException, ProgramInvocationException
+       {
+               String[] programArgs = options.getProgramArgs();
+               String jarFilePath = options.getJarFilePath();
+
+               if (jarFilePath == null) {
+                       throw new IllegalArgumentException("The program JAR 
file was not specified.");
                }
-               
+
                File jarFile = new File(jarFilePath);
                
                // Check if JAR file exists
                if (!jarFile.exists()) {
-                       throw new FileNotFoundException("Error: Jar file does 
not exist: " + jarFile);
+                       throw new FileNotFoundException("JAR file does not 
exist: " + jarFile);
                }
                else if (!jarFile.isFile()) {
-                       throw new FileNotFoundException("Error: Jar file is not 
a file: " + jarFile);
+                       throw new FileNotFoundException("JAR file is not a 
file: " + jarFile);
                }
                
                // Get assembler class
-               String entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
-                               line.getOptionValue(CLASS_OPTION.getOpt()) :
-                               null;
+               String entryPointClass = options.getEntryPointClassName();
 
                return entryPointClass == null ?
                                new PackagedProgram(jarFile, programArgs) :
                                new PackagedProgram(jarFile, entryPointClass, 
programArgs);
        }
-       
-       protected String getJobManagerAddressString(CommandLine line) throws 
IOException {
-               Configuration configuration = getGlobalConfiguration();
-               
-               // first, check if the address comes from the command line 
option
-               if (line.hasOption(ADDRESS_OPTION.getOpt())) {
+
+
+       protected InetSocketAddress getJobManagerAddress(CommandLineOptions 
options) throws Exception {
+
+               // first, check if the address is specified as an option
+               if (options.getJobManagerAddress() != null) {
+                       return 
parseJobManagerAddress(options.getJobManagerAddress());
+               }
+
+               // second, check whether the address was already parsed, or 
configured through the YARN properties
+               if (jobManagerAddress == null) {
+                       // config file must have the address
+                       String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+
+                       // verify that there is a jobmanager address and port 
in the configuration
+                       if (jobManagerHost == null) {
+                               throw new Exception("Found no configuration in 
the config directory '" + configDirectory
+                                               + "' that specifies the 
JobManager address.");
+                       }
+
+                       int jobManagerPort;
                        try {
-                               return 
line.getOptionValue(ADDRESS_OPTION.getOpt());
+                               jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
                        }
-                       catch (Exception e) {
-                               System.out.println("Error: The JobManager 
address has an invalid format. " + e.getMessage());
-                               return null;
+                       catch (NumberFormatException e) {
+                               throw new Exception("Invalid value for the 
JobManager port (" +
+                                               
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + ") in the configuration.");
                        }
-               }
-               else {
-                       Properties yarnProps = getYarnProperties();
-                       if(yarnProps != null) {
-                               try {
-                                       String address = 
yarnProps.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-                                       System.out.println("Found a yarn 
properties file (" + YARN_PROPERTIES_FILE + ") file, "
-                                                       + "using 
\""+address+"\" to connect to the JobManager");
-                                       return address;
-                               } catch (Exception e) {
-                                       System.out.println("Found a yarn 
properties " + YARN_PROPERTIES_FILE + " file, but could not read the JobManager 
address from the file. "
-                                                               + 
e.getMessage());
-                                       return null;
-                               }
-                       } else {
-                               // regular config file gives the address
-                               String jobManagerAddress = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-                               
-                               // verify that there is a jobmanager address 
and port in the configuration
-                               if (jobManagerAddress == null) {
-                                       System.out.println("Error: Found no 
configuration in the config directory '" +
-                                                       
getConfigurationDirectory() + "' that specifies the JobManager address.");
-                                       return null;
-                               }
-                               
-                               int jobManagerPort;
-                               try {
-                                       jobManagerPort = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-                               } catch (NumberFormatException e) {
-                                       System.out.println("Invalid value for 
the JobManager IPC port (" + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY +
-                                                       ") in the 
configuration.");
-                                       return null;
-                               }
-                               
-                               if (jobManagerPort == -1) {
-                                       System.out.println("Error: Found no 
configuration in the config directory '" +
-                                                       
getConfigurationDirectory() + "' that specifies the JobManager port.");
-                                       return null;
-                               }
-                               
-                               return jobManagerAddress + ":" + jobManagerPort;
+
+                       if (jobManagerPort == -1) {
+                               throw new Exception("Found no configuration in 
the config directory '" + configDirectory
+                                               + "' that specifies the 
JobManager port.");
                        }
-               }
-       }
-       
-       protected ActorRef getJobManager(CommandLine line, Configuration 
config) throws IOException {
-               //TODO: Get ActorRef from YarnCluster if we are in YARN mode.
-               String jobManagerAddressStr = getJobManagerAddressString(line);
-               if (jobManagerAddressStr == null) {
-                       return null;
-               }
 
-               final ActorSystem actorSystem;
-               try {
-                       scala.Tuple2<String, Object> systemEndpoint = new 
scala.Tuple2<String, Object>("", 0);
-                       actorSystem = AkkaUtils.createActorSystem(config, new 
Some<scala.Tuple2<String, Object>>(systemEndpoint));
-               }
-               catch (Exception e) {
-                       throw new IOException("Could not start actor system to 
communicate with JobManager", e);
+                       jobManagerAddress = new 
InetSocketAddress(jobManagerHost, jobManagerPort);
                }
 
-               try {
-                       InetSocketAddress address = 
RemoteExecutor.getInetFromHostport(jobManagerAddressStr);
-                       return JobManager.getJobManagerRemoteReference(address, 
actorSystem, config);
-               }
-               finally {
-                       actorSystem.shutdown();
-               }
+               return jobManagerAddress;
        }
        
+       protected ActorRef getJobManager(CommandLineOptions options) throws 
Exception {
+               //TODO: Get ActorRef from YarnCluster if we are in YARN mode.
 
-       public String getConfigurationDirectory() {
-               if (configurationDirectory == null) {
-                       configurationDirectory = 
getConfigurationDirectoryFromEnv();
-               }
-               return configurationDirectory;
-       }
+               InetSocketAddress address = getJobManagerAddress(options);
 
-       /**
-        * Reads configuration settings. The default path can be overridden
-        * by setting the ENV variable "FLINK_CONF_DIR".
-        *
-        * @return Flink's global configuration
-        */
-       protected Configuration getGlobalConfiguration() {
-               if (!globalConfigurationLoaded) {
-                       String location = getConfigurationDirectory();
-                       GlobalConfiguration.loadConfiguration(location);
-                       // set default parallelization degree
-                       Properties yarnProps;
+               // start an actor system if needed
+               if (this.actorSystem == null) {
+                       LOG.info("Starting actor system to communicate with 
JobManager");
                        try {
-                               yarnProps = getYarnProperties();
-                               if(yarnProps != null) {
-                                       String propDegree = 
yarnProps.getProperty(YARN_PROPERTIES_DOP);
-                                       int paraDegree = -1;
-                                       if(propDegree != null) { // maybe the 
property is not set
-                                               paraDegree = 
Integer.valueOf(propDegree);
-                                       }
-                                       Configuration c = 
GlobalConfiguration.getConfiguration();
-                                       if(paraDegree != -1) {
-                                               
c.setInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, paraDegree);
-                                       }
-                                       // handle the YARN client's dynamic 
properties
-                                       String dynamicPropertiesEncoded = 
yarnProps.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
-                                       List<Tuple2<String, String>> 
dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
-                                       for(Tuple2<String, String> 
dynamicProperty : dynamicProperties) {
-                                               c.setString(dynamicProperty.f0, 
dynamicProperty.f1);
-                                       }
-                                       
GlobalConfiguration.includeConfiguration(c); // update config
-                               }
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                               System.err.println("Error while loading YARN 
properties: " + e.getMessage());
+                               scala.Tuple2<String, Object> systemEndpoint = 
new scala.Tuple2<String, Object>("", 0);
+                               this.actorSystem = 
AkkaUtils.createActorSystem(config,
+                                               new Some<scala.Tuple2<String, 
Object>>(systemEndpoint));
+                       }
+                       catch (Exception e) {
+                               throw new IOException("Could not start actor 
system to communicate with JobManager", e);
                        }
 
-                       globalConfigurationLoaded = true;
+                       LOG.info("Actor system successfully started");
                }
-               return GlobalConfiguration.getConfiguration();
-       }
-       public static String getConfigurationDirectoryFromEnv() {
-               String location;
-               if (System.getenv(ENV_CONFIG_DIRECTORY) != null) {
-                       location = System.getenv(ENV_CONFIG_DIRECTORY);
-               } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
-                       location = CONFIG_DIRECTORY_FALLBACK_1;
-               } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
-                       location = CONFIG_DIRECTORY_FALLBACK_2;
-               } else {
-                       throw new RuntimeException("The configuration directory 
was not found. Please configure the '" +
-                                       ENV_CONFIG_DIRECTORY + "' environment 
variable properly.");
-               }
-               return location;
-       }
 
-       protected FiniteDuration getAkkaTimeout(){
-               Configuration config = getGlobalConfiguration();
-               return AkkaUtils.getTimeout(config);
+               LOG.info("Trying to lookup JobManager");
+               ActorRef jmActor = 
JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout);
+               LOG.info("JobManager is at " + jmActor.path());
+               return jmActor;
        }
        
-       public static List<Tuple2<String, String>> getDynamicProperties(String 
dynamicPropertiesEncoded) {
-               List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, 
String>>();
-               if(dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
-                       String[] propertyLines = 
dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-                       for(String propLine : propertyLines) {
-                               if(propLine == null) {
-                                       continue;
-                               }
-                               String[] kv = propLine.split("=");
-                               if(kv != null && kv[0] != null && kv[1] != null 
&& kv[0].length() > 0) {
-                                       ret.add(new Tuple2<String, 
String>(kv[0], kv[1]));
-                               }
-                       }
-               }
-               return ret;
-       }
-       
-       protected Properties getYarnProperties() throws IOException {
-               if(!yarnPropertiesLoaded) {
-                       String loc = getConfigurationDirectory();
-                       File propertiesFile = new File(loc + '/' + 
YARN_PROPERTIES_FILE);
-                       if (propertiesFile.exists()) {
-                               Properties props = new Properties();
-                               InputStream is = new FileInputStream( 
propertiesFile );
-                               props.load(is);
-                               yarnProperties = props;
-                               is.close();
-                       } else {
-                               yarnProperties = null;
-                       }
-                       yarnPropertiesLoaded = true;
-               }
-               return yarnProperties;
-       }
+
        
-       protected Client getClient(CommandLine line, ClassLoader classLoader, 
String programName) throws IOException {
-               String jmAddrString = getJobManagerAddressString(line);
-               InetSocketAddress jobManagerAddress = null;
-               if(jmAddrString.equals(YARN_DEPLOY_JOBMANAGER)) {
-                       System.out.println("YARN cluster mode detected. 
Switching Log4j output to console");
+       protected Client getClient(CommandLineOptions options, ClassLoader 
classLoader, String programName) throws Exception {
+
+               InetSocketAddress jobManagerAddress;
+
+               if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
+                       logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
 
-                       this.runInYarnCluster = true;
                        // user wants to run Flink in YARN cluster.
-                       AbstractFlinkYarnClient flinkYarnClient = 
yarnSessionCLi.createFlinkYarnClient(line);
-                       if(flinkYarnClient == null) {
+                       CommandLine commandLine = options.getCommandLine();
+                       AbstractFlinkYarnClient flinkYarnClient =
+                                       
CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
+
+                       if (flinkYarnClient == null) {
                                throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
                        }
                        try {
-                               yarnCluster = flinkYarnClient.deploy("Flink 
Application: "+programName);
-                       } catch(Exception e) {
+                               yarnCluster = flinkYarnClient.deploy("Flink 
Application: " + programName);
+                       }
+                       catch(Exception e) {
                                throw new RuntimeException("Error deploying the 
YARN cluster", e);
                        }
+
                        jobManagerAddress = yarnCluster.getJobManagerAddress();
-                       System.out.println("YARN cluster started");
-                       System.out.println("JobManager web interface address 
"+yarnCluster.getWebInterfaceURL());
-                       System.out.println("Waiting until all TaskManagers have 
connected");
+
+                       logAndSysout("YARN cluster started");
+                       logAndSysout("JobManager web interface address " + 
yarnCluster.getWebInterfaceURL());
+                       logAndSysout("Waiting until all TaskManagers have 
connected");
+
                        while(true) {
                                FlinkYarnClusterStatus status = 
yarnCluster.getClusterStatus();
-                               if(status != null) {
+                               if (status != null) {
                                        if (status.getNumberOfTaskManagers() < 
flinkYarnClient.getTaskManagerCount()) {
-                                               System.out.println("TaskManager 
status  (" + 
status.getNumberOfTaskManagers()+"/"+flinkYarnClient.getTaskManagerCount()+")");
+                                               logAndSysout("TaskManager 
status (" + status.getNumberOfTaskManagers() + "/" + 
flinkYarnClient.getTaskManagerCount() + ")");
                                        } else {
-                                               System.out.println("Enough 
TaskManagers are connected");
+                                               logAndSysout("All TaskManagers 
are connected");
                                                break;
                                        }
                                } else {
-                                       System.out.println("No status updates 
from YARN cluster received so far. Waiting ...");
+                                       logAndSysout("No status updates from 
YARN cluster received so far. Waiting ...");
                                }
+
                                try {
                                        Thread.sleep(500);
-                               } catch (InterruptedException e) {
-                                       System.err.println("Thread as 
interrupted"); Thread.currentThread().interrupt();
+                               }
+                               catch (InterruptedException e) {
+                                       LOG.error("Interrupted while waiting 
for TaskManagers");
+                                       System.err.println("Thread is 
interrupted");
+                                       Thread.currentThread().interrupt();
                                }
                        }
-               } else {
-                       jobManagerAddress = 
RemoteExecutor.getInetFromHostport(jmAddrString);
                }
-               return new Client(jobManagerAddress, getGlobalConfiguration(), 
classLoader);
+               else {
+                       jobManagerAddress = getJobManagerAddress(options);
+               }
+               return new Client(jobManagerAddress, config, classLoader);
        }
 
+       // 
--------------------------------------------------------------------------------------------
+       //  Logging and Exception Handling
+       // 
--------------------------------------------------------------------------------------------
+
        /**
-        * Prints the help for the client.
+        * Displays an exception message for incorrect command line arguments.
+        *
+        * @param e The exception to display.
+        * @return The return code for the process.
         */
-       private void printHelp() {
-               System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
-               System.out.println();
-               System.out.println("The following actions are available:");
-
-               /* The only general option is -h and the help messages are 
always printed on errors.
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setWidth(80);
-               formatter.setLeftPadding(5);
-               formatter.setSyntaxPrefix("  general options:");
-               formatter.printHelp(" ", GENERAL_OPTIONS);
-               */
-               
-               printHelpForRun();
-               printHelpForInfo();
-               printHelpForList();
-               printHelpForCancel();
-
-               System.out.println();
-       }
-       
-       private void printHelpForRun() {
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setLeftPadding(5);
-               formatter.setWidth(80);
-
-               System.out.println("\nAction \"run\" compiles and runs a 
program.");
-               System.out.println("\n  Syntax: run [OPTIONS] <jar-file> 
<arguments>");
-               formatter.setSyntaxPrefix("  \"run\" action options:");
-               formatter.printHelp(" ", 
getRunOptionsWithoutDeprecatedOptions(new Options()));
-               formatter.setSyntaxPrefix("  Additional arguments if -m 
"+YARN_DEPLOY_JOBMANAGER+" is set:");
-               Options yarnOpts = new Options();
-               yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
-               formatter.printHelp(" ", yarnOpts);
-               System.out.println();
-       }
-       
-       private void printHelpForInfo() {
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setLeftPadding(5);
-               formatter.setWidth(80);
-               
-               System.out.println("\nAction \"info\" shows the optimized 
execution plan of the program (JSON).");
-               System.out.println("\n  Syntax: info [OPTIONS] <jar-file> 
<arguments>");
-               formatter.setSyntaxPrefix("  \"info\" action options:");
-               formatter.printHelp(" ", 
getInfoOptionsWithoutDeprecatedOptions(new Options()));
-               System.out.println();
-       }
-       
-       private void printHelpForList() {
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setLeftPadding(5);
-               formatter.setWidth(80);
-               
-               System.out.println("\nAction \"list\" lists running and 
scheduled programs.");
-               System.out.println("\n  Syntax: list [OPTIONS]");
-               formatter.setSyntaxPrefix("  \"list\" action options:");
-               formatter.printHelp(" ", getListOptions(new Options()));
-               System.out.println();
-       }
-       
-       private void printHelpForCancel() {
-               HelpFormatter formatter = new HelpFormatter();
-               formatter.setLeftPadding(5);
-               formatter.setWidth(80);
-               
-               System.out.println("\nAction \"cancel\" cancels a running 
program.");
-               System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
-               formatter.setSyntaxPrefix("  \"cancel\" action options:");
-               formatter.printHelp(" ", getCancelOptions(new Options()));
-               System.out.println();
-       }
-       
        private int handleArgException(Exception e) {
+               LOG.error("Invalid command line arguments." + (e.getMessage() 
== null ? "" : e.getMessage()));
+
                System.out.println(e.getMessage());
                System.out.println();
-               System.out.println("Specify the help option (-h or --help) to 
get help on the command.");
+               System.out.println("Use the help option (-h or --help) to get 
help on the command.");
                return 1;
        }
        /**
-        * Displays exceptions.
+        * Displays an exception message.
         * 
         * @param t The exception to display.
+        * @return The return code for the process.
         */
        private int handleError(Throwable t) {
+               LOG.error("Error while running the command.", t);
+
                t.printStackTrace();
                System.err.println();
                System.err.println("The exception above occurred while trying 
to run your command.");
                return 1;
        }
 
+       private void logAndSysout(String message) {
+               LOG.info(message);
+               System.out.println(message);
+       }
 
-       
+       // 
--------------------------------------------------------------------------------------------
+       //  Entry point for executable
+       // 
--------------------------------------------------------------------------------------------
 
-       private void evaluateGeneralOptions(CommandLine line) {
-               // check help flag
-               this.printHelp = line.hasOption(HELP_OPTION.getOpt());
-       }
-       
        /**
         * Parses the command line arguments and starts the requested action.
         * 
@@ -1057,7 +793,7 @@ public class CliFrontend {
                
                // check for action
                if (args.length < 1) {
-                       printHelp();
+                       CliFrontendParser.printHelp();
                        System.out.println("Please specify an action.");
                        return 1;
                }
@@ -1071,8 +807,11 @@ public class CliFrontend {
                // do action
                if (action.equals(ACTION_RUN)) {
                        // run() needs to run in a secured environment for the 
optimizer.
-                       if(SecurityUtils.isSecurityEnabled()) {
-                               System.out.println("Secure Hadoop setup 
detected.");
+                       if (SecurityUtils.isSecurityEnabled()) {
+                               String message = "Secure Hadoop environment 
setup detected. Running in secure context.";
+                               LOG.info(message);
+                               System.out.println(message);
+
                                try {
                                        return SecurityUtils.runSecured(new 
SecurityUtils.FlinkSecuredRunner<Integer>() {
                                                @Override
@@ -1085,16 +824,21 @@ public class CliFrontend {
                                }
                        }
                        return run(params);
-               } else if (action.equals(ACTION_LIST)) {
+               }
+               else if (action.equals(ACTION_LIST)) {
                        return list(params);
-               } else if (action.equals(ACTION_INFO)) {
+               }
+               else if (action.equals(ACTION_INFO)) {
                        return info(params);
-               } else if (action.equals(ACTION_CANCEL)) {
+               }
+               else if (action.equals(ACTION_CANCEL)) {
                        return cancel(params);
-               } else if (action.equals("-h") || action.equals("--help")) {
-                       printHelp();
+               }
+               else if (action.equals("-h") || action.equals("--help")) {
+                       CliFrontendParser.printHelp();
                        return 0;
-               } else {
+               }
+               else {
                        System.out.printf("\"%s\" is not a valid action.\n", 
action);
                        System.out.println();
                        System.out.println("Valid actions are \"run\", 
\"list\", \"info\", or \"cancel\".");
@@ -1104,15 +848,92 @@ public class CliFrontend {
                }
        }
 
+       public void shutdown() {
+               ActorSystem sys = this.actorSystem;
+               if (sys != null) {
+                       this.actorSystem = null;
+                       sys.shutdown();
+               }
+       }
 
        /**
         * Submits the job based on the arguments
         */
-       public static void main(String[] args) throws ParseException {
+       public static void main(String[] args) {
+               EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line 
Client", args);
+               EnvironmentInformation.checkJavaVersion();
+
+               try {
+                       CliFrontend cli = new CliFrontend();
+                       int retCode = cli.parseParameters(args);
+                       System.exit(retCode);
+               }
+               catch (Throwable t) {
+                       LOG.error("Fatal error while running command line 
interface.", t);
+                       t.printStackTrace();
+                       System.exit(31);
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Miscellaneous Utilities
+       // 
--------------------------------------------------------------------------------------------
+
+       private static InetSocketAddress parseJobManagerAddress(String 
hostAndPort) {
+               URI uri;
+               try {
+                       uri = new URI("my://" + hostAndPort);
+               } catch (URISyntaxException e) {
+                       throw new RuntimeException("Malformed address " + 
hostAndPort, e);
+               }
+               String host = uri.getHost();
+               int port = uri.getPort();
+               if (host == null || port == -1) {
+                       throw new RuntimeException("Address is missing hostname 
or port " + hostAndPort);
+               }
+               return new InetSocketAddress(host, port);
+       }
 
-               CliFrontend cli = new CliFrontend();
-               int retCode = cli.parseParameters(args);
-               System.exit(retCode);
+       public static String getConfigurationDirectoryFromEnv() {
+               String location = System.getenv(ENV_CONFIG_DIRECTORY);
+
+               if (location != null) {
+                       if (new File(location).exists()) {
+                               return location;
+                       }
+                       else {
+                               throw new RuntimeException("The config 
directory '" + location + "', specified in the '" +
+                                               ENV_CONFIG_DIRECTORY + "' 
environment variable, does not exist.");
+                       }
+               }
+               else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
+                       location = CONFIG_DIRECTORY_FALLBACK_1;
+               }
+               else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
+                       location = CONFIG_DIRECTORY_FALLBACK_2;
+               }
+               else {
+                       throw new RuntimeException("The configuration directory 
was not specified. " +
+                                       "Please specify the directory 
containing the configuration file through the '" +
+                                       ENV_CONFIG_DIRECTORY + "' environment 
variable.");
+               }
+               return location;
        }
 
+       public static List<Tuple2<String, String>> getDynamicProperties(String 
dynamicPropertiesEncoded) {
+               List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, 
String>>();
+               if(dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
+                       String[] propertyLines = 
dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+                       for(String propLine : propertyLines) {
+                               if(propLine == null) {
+                                       continue;
+                               }
+                               String[] kv = propLine.split("=");
+                               if (kv.length >= 2 && kv[0] != null && kv[1] != 
null && kv[0].length() > 0) {
+                                       ret.add(new Tuple2<String, 
String>(kv[0], kv[1]));
+                               }
+                       }
+               }
+               return ret;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
new file mode 100644
index 0000000..22e9ece
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the CANCEL command
+ */
+public class CancelOptions extends CommandLineOptions {
+
+       private final String[] args;
+
+       public CancelOptions(CommandLine line) {
+               super(line);
+               this.args = line.getArgs();
+       }
+
+       public String[] getArgs() {
+               return args == null ? new String[0] : args;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
new file mode 100644
index 0000000..932c66d
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliArgsException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+/**
+ * Special exception that is thrown when the command line parsing fails.
+ */
+public class CliArgsException extends Exception {
+
+       private static final long serialVersionUID = 1L;
+
+       public CliArgsException(String message) {
+               super(message);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
new file mode 100644
index 0000000..0f6ad24
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.FlinkYarnSessionCli;
+
+/**
+ * A simple command line parser (based on Apache Commons CLI) that extracts 
command
+ * line options.
+ */
+public class CliFrontendParser {
+
+       /** command line interface of the YARN session, with a special 
initialization here
+        *  to prefix all options with y/yarn. */
+       private static final FlinkYarnSessionCli yarnSessionCLi = new 
FlinkYarnSessionCli("y", "yarn");
+
+
+       static final Option HELP_OPTION = new Option("h", "help", false,
+                                                                               
                "Show the help message for the CLI Frontend or the action.");
+
+       static final Option JAR_OPTION = new Option("j", "jarfile", true, 
"Flink program JAR file.");
+
+       static final Option CLASS_OPTION = new Option("c", "class", true,
+                       "Class with the program entry point (\"main\" method or 
\"getPlan()\" method. Only needed if the " +
+                                       "JAR file does not specify the class in 
its manifest.");
+
+       static final Option PARALLELISM_OPTION = new Option("p", "parallelism", 
true,
+                       "The parallelism with which to run the program. 
Optional flag to override the default value " +
+                                       "specified in the configuration.");
+
+       static final Option ARGS_OPTION = new Option("a", "arguments", true,
+                       "Program arguments. Arguments can also be added without 
-a, simply as trailing parameters.");
+
+       static final Option ADDRESS_OPTION = new Option("m", "jobmanager", true,
+                       "Address of the JobManager (master) to which to 
connect. Specify '" + CliFrontend.YARN_DEPLOY_JOBMANAGER
+                                       + "' as the JobManager to deploy a YARN 
cluster for the job. Use this flag to connect to a " +
+                                       "different JobManager than the one 
specified in the configuration.");
+
+       // list specific options
+       static final Option RUNNING_OPTION = new Option("r", "running", false,
+                       "Show only running programs and their JobIDs");
+
+       static final Option SCHEDULED_OPTION = new Option("s", "scheduled", 
false,
+                       "Show only scheduled programs and their JobIDs");
+
+       static {
+               HELP_OPTION.setRequired(false);
+
+               JAR_OPTION.setRequired(false);
+               JAR_OPTION.setArgName("jarfile");
+
+               CLASS_OPTION.setRequired(false);
+               CLASS_OPTION.setArgName("classname");
+
+               ADDRESS_OPTION.setRequired(false);
+               ADDRESS_OPTION.setArgName("host:port");
+
+               PARALLELISM_OPTION.setRequired(false);
+               PARALLELISM_OPTION.setArgName("parallelism");
+
+               ARGS_OPTION.setRequired(false);
+               ARGS_OPTION.setArgName("programArgs");
+               ARGS_OPTION.setArgs(Option.UNLIMITED_VALUES);
+
+               RUNNING_OPTION.setRequired(false);
+               SCHEDULED_OPTION.setRequired(false);
+       }
+
+       private static final Options RUN_OPTIONS = 
getRunOptions(buildGeneralOptions(new Options()));
+       private static final Options INFO_OPTIONS = 
getInfoOptions(buildGeneralOptions(new Options()));
+       private static final Options LIST_OPTIONS = 
getListOptions(buildGeneralOptions(new Options()));
+       private static final Options CANCEL_OPTIONS = 
getCancelOptions(buildGeneralOptions(new Options()));
+
+
+       private static Options buildGeneralOptions(Options options) {
+               options.addOption(HELP_OPTION);
+               // backwards compatibility: ignore verbose flag (-v)
+               options.addOption(new Option("v", "verbose", false, "This 
option is deprecated."));
+               return options;
+       }
+
+       public static Options getProgramSpecificOptions(Options options) {
+               options.addOption(JAR_OPTION);
+               options.addOption(CLASS_OPTION);
+               options.addOption(PARALLELISM_OPTION);
+               options.addOption(ARGS_OPTION);
+
+               // also add the YARN options so that the parser can parse them
+               yarnSessionCLi.getYARNSessionCLIOptions(options);
+               return options;
+       }
+
+       private static Options 
getProgramSpecificOptionsWithoutDeprecatedOptions(Options options) {
+               options.addOption(CLASS_OPTION);
+               options.addOption(PARALLELISM_OPTION);
+               return options;
+       }
+
+       private static Options getRunOptions(Options options) {
+               Options o = getProgramSpecificOptions(options);
+               return getJobManagerAddressOption(o);
+       }
+
+       private static Options getRunOptionsWithoutDeprecatedOptions(Options 
options) {
+               Options o = 
getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+               return getJobManagerAddressOption(o);
+       }
+
+       private static Options getJobManagerAddressOption(Options options) {
+               options.addOption(ADDRESS_OPTION);
+               return options;
+       }
+
+       private static Options getInfoOptions(Options options) {
+               options = getProgramSpecificOptions(options);
+               options = getJobManagerAddressOption(options);
+               return options;
+       }
+
+       private static Options getInfoOptionsWithoutDeprecatedOptions(Options 
options) {
+               options = 
getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+               options = getJobManagerAddressOption(options);
+               return options;
+       }
+
+       private static Options getListOptions(Options options) {
+               options.addOption(RUNNING_OPTION);
+               options.addOption(SCHEDULED_OPTION);
+               options = getJobManagerAddressOption(options);
+               return options;
+       }
+
+       private static Options getCancelOptions(Options options) {
+               options = getJobManagerAddressOption(options);
+               return options;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Help
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Prints the help for the client.
+        */
+       public static void printHelp() {
+               System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
+               System.out.println();
+               System.out.println("The following actions are available:");
+
+               printHelpForRun();
+               printHelpForInfo();
+               printHelpForList();
+               printHelpForCancel();
+
+               System.out.println();
+       }
+
+       public static void printHelpForRun() {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"run\" compiles and runs a 
program.");
+               System.out.println("\n  Syntax: run [OPTIONS] <jar-file> 
<arguments>");
+               formatter.setSyntaxPrefix("  \"run\" action options:");
+               formatter.printHelp(" ", 
getRunOptionsWithoutDeprecatedOptions(new Options()));
+               formatter.setSyntaxPrefix("  Additional arguments if -m " + 
CliFrontend.YARN_DEPLOY_JOBMANAGER + " is set:");
+               Options yarnOpts = new Options();
+               yarnSessionCLi.getYARNSessionCLIOptions(yarnOpts);
+               formatter.printHelp(" ", yarnOpts);
+               System.out.println();
+       }
+
+       public static void printHelpForInfo() {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"info\" shows the optimized 
execution plan of the program (JSON).");
+               System.out.println("\n  Syntax: info [OPTIONS] <jar-file> 
<arguments>");
+               formatter.setSyntaxPrefix("  \"info\" action options:");
+               formatter.printHelp(" ", 
getInfoOptionsWithoutDeprecatedOptions(new Options()));
+               System.out.println();
+       }
+
+       public static void printHelpForList() {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"list\" lists running and 
scheduled programs.");
+               System.out.println("\n  Syntax: list [OPTIONS]");
+               formatter.setSyntaxPrefix("  \"list\" action options:");
+               formatter.printHelp(" ", getListOptions(new Options()));
+               System.out.println();
+       }
+
+       public static void printHelpForCancel() {
+               HelpFormatter formatter = new HelpFormatter();
+               formatter.setLeftPadding(5);
+               formatter.setWidth(80);
+
+               System.out.println("\nAction \"cancel\" cancels a running 
program.");
+               System.out.println("\n  Syntax: cancel [OPTIONS] <Job ID>");
+               formatter.setSyntaxPrefix("  \"cancel\" action options:");
+               formatter.printHelp(" ", getCancelOptions(new Options()));
+               System.out.println();
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Line Parsing
+       // 
--------------------------------------------------------------------------------------------
+
+       public static RunOptions parseRunCommand(String[] args) throws 
CliArgsException {
+               try {
+                       PosixParser parser = new PosixParser();
+                       CommandLine line = parser.parse(RUN_OPTIONS, args, 
true);
+                       return new RunOptions(line);
+               }
+               catch (ParseException e) {
+                       throw new CliArgsException(e.getMessage());
+               }
+       }
+
+       public static ListOptions parseListCommand(String[] args) throws 
CliArgsException {
+               try {
+                       PosixParser parser = new PosixParser();
+                       CommandLine line = parser.parse(LIST_OPTIONS, args, 
false);
+                       return new ListOptions(line);
+               }
+               catch (ParseException e) {
+                       throw new CliArgsException(e.getMessage());
+               }
+       }
+
+       public static CancelOptions parseCancelCommand(String[] args) throws 
CliArgsException {
+               try {
+                       PosixParser parser = new PosixParser();
+                       CommandLine line = parser.parse(CANCEL_OPTIONS, args, 
false);
+                       return new CancelOptions(line);
+               }
+               catch (ParseException e) {
+                       throw new CliArgsException(e.getMessage());
+               }
+       }
+
+       public static InfoOptions parseInfoCommand(String[] args) throws 
CliArgsException {
+               try {
+                       PosixParser parser = new PosixParser();
+                       CommandLine line = parser.parse(INFO_OPTIONS, args, 
false);
+                       return new InfoOptions(line);
+               }
+               catch (ParseException e) {
+                       throw new CliArgsException(e.getMessage());
+               }
+       }
+
+       public static FlinkYarnSessionCli getFlinkYarnSessionCli() {
+               return yarnSessionCLi;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
new file mode 100644
index 0000000..f6f6319
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CommandLineOptions.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.HELP_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.ADDRESS_OPTION;
+
+/**
+ * Base class for all options parsed from the command line.
+ * Contains options for printing help and the JobManager address.
+ */
+public abstract class CommandLineOptions {
+
+       private final CommandLine commandLine;
+
+       private final String jobManagerAddress;
+
+       private final boolean printHelp;
+
+
+       protected CommandLineOptions(CommandLine line) {
+               this.commandLine = line;
+               this.printHelp = line.hasOption(HELP_OPTION.getOpt());
+               this.jobManagerAddress = 
line.hasOption(ADDRESS_OPTION.getOpt()) ?
+                               line.getOptionValue(ADDRESS_OPTION.getOpt()) : 
null;
+       }
+
+       public CommandLine getCommandLine() {
+               return commandLine;
+       }
+
+       public boolean isPrintHelp() {
+               return printHelp;
+       }
+
+       public String getJobManagerAddress() {
+               return jobManagerAddress;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
new file mode 100644
index 0000000..83f5c38
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/InfoOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the INFO command
+ */
+public class InfoOptions extends ProgramOptions {
+
+       public InfoOptions(CommandLine line) throws CliArgsException {
+               super(line);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
new file mode 100644
index 0000000..45f39a4
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ListOptions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.RUNNING_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SCHEDULED_OPTION;
+
+/**
+ * Command line options for the LIST command
+ */
+public class ListOptions extends CommandLineOptions {
+
+       private final boolean running;
+       private final boolean scheduled;
+
+       public ListOptions(CommandLine line) {
+               super(line);
+               this.running = line.hasOption(RUNNING_OPTION.getOpt());
+               this.scheduled = line.hasOption(SCHEDULED_OPTION.getOpt());
+       }
+
+       public boolean getRunning() {
+               return running;
+       }
+
+       public boolean getScheduled() {
+               return scheduled;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
new file mode 100644
index 0000000..5b24a41
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import java.util.Arrays;
+
+import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+
+/**
+ * Base class for command line options that refer to a JAR file program.
+ */
+public abstract class ProgramOptions extends CommandLineOptions {
+
+       private final String jarFilePath;
+
+       private final String entryPointClass;
+
+       private final String[] programArgs;
+
+       private final int parallelism;
+
+       protected ProgramOptions(CommandLine line) throws CliArgsException {
+               super(line);
+
+               String[] args = line.hasOption(ARGS_OPTION.getOpt()) ?
+                               line.getOptionValues(ARGS_OPTION.getOpt()) :
+                               line.getArgs();
+
+               if (line.hasOption(JAR_OPTION.getOpt())) {
+                       this.jarFilePath = 
line.getOptionValue(JAR_OPTION.getOpt());
+               }
+               else if (args.length > 0) {
+                       jarFilePath = args[0];
+                       args = Arrays.copyOfRange(args, 1, args.length);
+               }
+               else {
+                       jarFilePath = null;
+               }
+
+               this.programArgs = args;
+
+               this.entryPointClass = line.hasOption(CLASS_OPTION.getOpt()) ?
+                               line.getOptionValue(CLASS_OPTION.getOpt()) : 
null;
+
+               if (line.hasOption(PARALLELISM_OPTION.getOpt())) {
+                       String parString = 
line.getOptionValue(PARALLELISM_OPTION.getOpt());
+                       try {
+                               parallelism = Integer.parseInt(parString);
+                               if (parallelism <= 0) {
+                                       throw new NumberFormatException();
+                               }
+                       }
+                       catch (NumberFormatException e) {
+                               throw new CliArgsException("The parallelism 
must be a positive number: " + parString);
+                       }
+               }
+               else {
+                       parallelism = -1;
+               }
+       }
+
+       public String getJarFilePath() {
+               return jarFilePath;
+       }
+
+       public String getEntryPointClassName() {
+               return entryPointClass;
+       }
+
+       public String[] getProgramArgs() {
+               return programArgs;
+       }
+
+       public int getParallelism() {
+               return parallelism;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
new file mode 100644
index 0000000..2e4eb31
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/RunOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+/**
+ * Command line options for the RUN command.
+ */
+public class RunOptions extends ProgramOptions {
+
+       public RunOptions(CommandLine line) throws CliArgsException {
+               super(line);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 5a032a0..f4a2dc9 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -25,7 +25,6 @@ import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.util.List;
 
-import akka.remote.AssociationErrorEvent;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
@@ -219,8 +218,7 @@ public class Client {
        }
        
        private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
-               JobGraph job = null;
-
+               JobGraph job;
                if (optPlan instanceof StreamingPlan) {
                        job = ((StreamingPlan) optPlan).getJobGraph();
                } else {
@@ -356,21 +354,6 @@ public class Client {
                return new JobExecutionResult(-1, null);
        }
 
-       private Throwable getAssociationError(List<AssociationErrorEvent> 
eventLog) {
-               int len = eventLog.size();
-               if (len > 0) {
-                       AssociationErrorEvent e = eventLog.get(len - 1);
-                       Throwable cause = e.getCause();
-                       if (cause instanceof akka.remote.InvalidAssociation) {
-                               return cause.getCause();
-                       } else {
-                               return cause;
-                       }
-               } else {
-                       return null;
-               }
-       }
-
        // 
--------------------------------------------------------------------------------------------
        
        public static final class OptimizerPlanEnvironment extends 
ExecutionEnvironment {

Reply via email to