[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2085 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r67018443 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { --- End diff -- I agree that the functionality is the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r67003363 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- The code no longer waits for TaskManagers when the cluster is resumed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66806299 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- Same as for `list` and `info`. I agree this verbosity is not very nice. Looking into how I can make this look nicer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805960 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() { } /** -* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster -* connected to this client. +* Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). +* @return The address (host and port) of the leading JobManager */ - public int getMaxSlots() { - return this.maxSlots; + public InetSocketAddress getJobManagerAddressFromConfig() { + try { + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return new InetSocketAddress(hostName, port); --- End diff -- Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805953 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; + + +/** + * Custom command-line interface to load hooks for the command-line interface. + */ +public interface CustomCommandLine { + + /** +* Returns a unique identifier for this custom command-line. +* @return An unique identifier string +*/ + String getIdentifier(); + + /** +* Adds custom options to the existing run options. +* @param baseOptions The existing options. +*/ + void addRunOptions(Options baseOptions); + + /** +* Adds custom options to the existing general options. +* @param baseOptions The existing options. +*/ + void addGeneralOptions(Options baseOptions); + + /** +* Retrieves a client for a running cluster +* @param commandLine The command-line parameters from the CliFrontend +* @param config The Flink config +* @return Client if a cluster could be retrieve, null otherwise --- End diff -- Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805706 --- Diff: docs/setup/yarn_setup.md --- @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill `) to stop the YARN session. + Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId YARN application Id --- End diff -- Yes, same as above. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805492 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + --- End diff -- Good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805499 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + + } + return null; --- End diff -- Same as above. Fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805479 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { --- End diff -- Yes, I wanted to bring up the Yarn session only once the cluster is ready. It is a semantic change but IMHO transparent to the user. There is not disadvantage from waiting until the cluster is ready. Ultimately, it would be nice to get rid of all waiting but we're not quite there yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805455 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -415,62 +513,83 @@ public int run(String[] args) { } System.out.println(description); return 0; + } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM --- End diff -- Sorry, added this TODO while I fixing some of your first comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805382 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); --- End diff -- Thank you. I fixed that by using a non-static variable for the logger and dynamically retrieving the class name, i.e. ```java private final Logger LOG = LoggerFactory.getLogger(getClass()); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66805417 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Thank you, I'll look into this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66801768 --- Diff: docs/setup/yarn_setup.md --- @@ -143,6 +143,34 @@ Note that in this case its not possible to stop the YARN session using Flink. Use the YARN utilities (`yarn application -kill `) to stop the YARN session. + Attach to an existing Session + +Use the following command to start a session + +~~~bash +./bin/yarn-session.sh +~~~ + +This command will show you the following overview: + +~~~bash +Usage: + Required + -id,--applicationId YARN application Id --- End diff -- I think this option is also not listed when running ./bin/yarn-session.sh. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66800124 --- Diff: docs/apis/cli.md --- @@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs. configuration. -r,--running Show only running programs and their JobIDs -s,--scheduledShow only scheduled programs and their JobIDs + Additional arguments if -m yarn-cluster is set: + -yid YARN application ID of Flink YARN session to + connect to. Must not be set if JobManager HA + is used. In this case, JobManager RPC + location is automatically retrieved from + Zookeeper. --- End diff -- I tried this, but I wonder why its not logging that the job has been cancelled. It logs that "All TaskManagers are connected", I don't think this message is relevant when cancelling a job. ``` [cloudera@quickstart build-target]$ ./bin/flink cancel -m yarn-cluster -yid application_1447844011707_0038 b9b8f76616073d09c596545a3cda978f 2016-06-13 07:22:21,355 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 07:22:22,277 INFO org.apache.flink.yarn.YarnClusterClient - Start application client. 2016-06-13 07:22:22,288 INFO org.apache.flink.yarn.ApplicationClient - Notification about new leader address akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null. 2016-06-13 07:22:22,290 INFO org.apache.flink.yarn.ApplicationClient - Received address of new leader akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null. 2016-06-13 07:22:22,290 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager null. Waiting until all TaskManagers have connected 2016-06-13 07:22:22,297 INFO org.apache.flink.yarn.ApplicationClient - Trying to register at JobManager akka.tcp://flink@10.0.2.15:51747/user/jobmanager. No status updates from the YARN cluster received so far. Waiting ... 2016-06-13 07:22:22,542 INFO org.apache.flink.yarn.ApplicationClient - Successfully registered at the ResourceManager using JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764] All TaskManagers are connected 2016-06-13 07:22:22,945 INFO org.apache.flink.yarn.YarnClusterClient - Shutting down YarnClusterClient from the client shutdown hook 2016-06-13 07:22:22,945 INFO org.apache.flink.yarn.YarnClusterClient - Disconnecting YarnClusterClient from ApplicationMaster 2016-06-13 07:22:22,947 INFO org.apache.flink.yarn.ApplicationClient - Stopped Application client. 2016-06-13 07:22:22,947 INFO org.apache.flink.yarn.ApplicationClient - Disconnect from JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]. 2016-06-13 07:22:23,056 INFO org.apache.flink.yarn.YarnClusterClient - Application application_1447844011707_0038 finished with state RUNNING and final state UNDEFINED at 0 [cloudera@quickstart build-target]$ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66799100 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Are there any tests for this functionality? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66797799 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + + } + return null; --- End diff -- Does it make sense to log that the config hasn't been set? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66797580 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); + + while(true) { --- End diff -- It seems that this loop is now executed also for the YARN session. In the past, this was used only for "per job" yarn clusters, because there we needed to have all slots available to submit the job. Now, this loop is "blocking" the YARN session CLI if not all TaskManager's are connecting. Its not a big deal, but a change in the semantics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66797696 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java --- @@ -300,18 +309,82 @@ public static boolean allocateResource(int[] nodeManagers, int toAllocate) { return false; } - @Override public void setDetachedMode(boolean detachedMode) { this.detached = detachedMode; } - @Override - public boolean isDetached() { + public boolean isDetachedMode() { return detached; } + + /** +* Gets a Hadoop Yarn client +* @return Returns a YarnClient which has to be shutdown manually +*/ + private static YarnClient getYarnClient(Configuration conf) { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + + /** +* Retrieves the Yarn application and cluster from the config +* @param config The config with entries to retrieve the cluster +* @return YarnClusterClient +* @deprecated This should be removed in the future +*/ + public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws Exception { + String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + + if (jobManagerHost != null && jobManagerPort != -1) { + + YarnClient yarnClient = getYarnClient(conf); + List applicationReports = yarnClient.getApplications(); + for (ApplicationReport report : applicationReports) { + if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { + LOG.info("Found application '{}' " + + "with JobManager host name '{}' and port '{}' from Yarn properties file.", + report.getApplicationId(), jobManagerHost, jobManagerPort); + return retrieve(report.getApplicationId().toString()); + } + } + --- End diff -- I wonder if it makes sense to log here that we were unable to find the application id? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66795439 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix, boolean accept NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN"); } + /** -* Creates a new Yarn Client. -* @param cmd the command line to parse options from -* @return an instance of the client or null if there was an error +* Resumes from a Flink Yarn properties file +* @param flinkConfiguration The flink configuration +* @return True if the properties were loaded, false otherwise */ - public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { + private boolean resumeFromYarnProperties(Configuration flinkConfiguration) { + // load the YARN properties + File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration)); + if (!propertiesFile.exists()) { + return false; + } + + logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); + + Properties yarnProperties = new Properties(); + try { + try (InputStream is = new FileInputStream(propertiesFile)) { + yarnProperties.load(is); + } + } + catch (IOException e) { + throw new RuntimeException("Cannot read the YARN properties file", e); + } + + // configure the default parallelism from YARN + String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); + if (propParallelism != null) { // maybe the property is not set + try { + int parallelism = Integer.parseInt(propParallelism); + flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism); + + logAndSysout("YARN properties set default parallelism to " + parallelism); + } + catch (NumberFormatException e) { + throw new RuntimeException("Error while parsing the YARN properties: " + + "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer."); + } + } + + // get the JobManager address from the YARN properties + String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); + InetSocketAddress jobManagerAddress; + if (address != null) { + try { + jobManagerAddress = ClientUtils.parseHostPortAddress(address); + // store address in config from where it is retrieved by the retrieval service + CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress); + } + catch (Exception e) { + throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e); + } + + logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); + } - AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - if (flinkYarnClient == null) { - return null; + // handle the YARN client's dynamic properties + String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); + Map dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry dynamicProperty : dynamicProperties.entrySet()) { + flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); } + return true; + } + + public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { + + + YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(); --- End diff -- A bit too many blank lines ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or f
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66795170 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java --- @@ -415,62 +513,83 @@ public int run(String[] args) { } System.out.println(description); return 0; + } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM --- End diff -- What is this TODO about? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66794640 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- This is a similar thing: ``` [cloudera@quickstart build-target]$ ./bin/flink list -m yarn-cluster -yid obiouslyWrong -r The program finished with the following exception: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127) at org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:844) at org.apache.flink.client.CliFrontend.list(CliFrontend.java:378) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:986) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1034) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125) ... 4 more [cloudera@quickstart build-target]$ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66794535 --- Diff: docs/apis/cli.md --- @@ -105,6 +105,10 @@ The command line can be used to ./bin/flink list -r +- List running Flink jobs inside Flink YARN session: + +./bin/flink list -m yarn-cluster -yid -r --- End diff -- Why is the client trying to connect even though the application has been finished already? ``` [cloudera@quickstart build-target]$ ./bin/flink list -m yarn-cluster -yid application_1447844011707_0036 -r 2016-06-13 06:51:34,581 INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032 2016-06-13 06:51:35,017 ERROR org.apache.flink.yarn.YarnClusterDescriptor - The application application_1447844011707_0036 doesn't run anymore. It has previously completed with final status: SUCCEEDED The program finished with the following exception: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127) at org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:844) at org.apache.flink.client.CliFrontend.list(CliFrontend.java:378) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:986) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1034) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125) ... 4 more ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66791210 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java --- @@ -211,18 +185,45 @@ public void run() { Runtime.getRuntime().addShutdownHook(clientShutdownHook); isConnected = true; + + logAndSysout("Waiting until all TaskManagers have connected"); --- End diff -- Since the `logAndSysout` method is defined in the `ClusterClient`, all log messages appear from being from that class, even though they are coming from the `YarnClusterClient`. Maybe it makes sense to override the method or add an argument for the logger. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66789723 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() { } /** -* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster -* connected to this client. +* Gets the current JobManager address from the Flink configuration (may change in case of a HA setup). +* @return The address (host and port) of the leading JobManager */ - public int getMaxSlots() { - return this.maxSlots; + public InetSocketAddress getJobManagerAddressFromConfig() { + try { + String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); + int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); + return new InetSocketAddress(hostName, port); --- End diff -- Missing indentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66765910 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** -* Retrieves a {@link Client} object from the given command line options and other parameters. +* Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name -* @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); --- End diff -- Okay, moving the logic is addressing my concerns. I checked the classes again and my renaming suggestions don't make much sense ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66628410 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** -* Retrieves a {@link Client} object from the given command line options and other parameters. +* Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name -* @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); --- End diff -- Well spotted. I think I'll move this logic to the implementation of the CustomCommandLine. I don't quite understand your renaming suggestion, are you suggesting to break up the CustomCommandLine into CustomParser and CustomCLI? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66626302 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** -* Retrieves a {@link Client} object from the given command line options and other parameters. +* Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name -* @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); - jobManagerAddress = yarnCluster.getJobManagerAddress(); - writeJobManagerAddressToConfig(jobManagerAddress); - - // overwrite the yarn client config (because the client parses the dynamic properties) - this.config.addAll(flinkYarnClient.getFlinkConfiguration()); - - logAndSysout("YARN cluster started"); -
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66626078 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -1275,33 +1071,16 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { return location; } - public static Map getDynamicProperties(String dynamicPropertiesEncoded) { - if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { - Map properties = new HashMap<>(); - - String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); - for (String propLine : propertyLines) { - if (propLine == null) { - continue; - } - - String[] kv = propLine.split("="); - if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { - properties.put(kv[0], kv[1]); - } - } - return properties; - } - else { - return Collections.emptyMap(); - } - } - public static String getYarnPropertiesLocation(Configuration conf) { - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - - return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser; + /** +* Writes the given job manager address to the associated configuration object +* +* @param address Address to write to the configuration +* @param config The config to write to +*/ + public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) { --- End diff -- +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66625805 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client.cli; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.configuration.Configuration; + + +/** + * Custom command-line interface to load hooks for the command-line interface. + */ +public interface CustomCommandLine { + + /** +* Returns a unique identifier for this custom command-line. +* @return An unique identifier string +*/ + String getIdentifier(); + + /** +* Adds custom options to the existing run options. +* @param baseOptions The existing options. +*/ + void addRunOptions(Options baseOptions); + + /** +* Adds custom options to the existing general options. +* @param baseOptions The existing options. +*/ + void addGeneralOptions(Options baseOptions); + + /** +* Retrieves a client for a running cluster +* @param commandLine The command-line parameters from the CliFrontend +* @param config The Flink config +* @return Client if a cluster could be retrieve, null otherwise --- End diff -- typo: retrieved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66625502 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -1275,33 +1071,16 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) { return location; } - public static Map getDynamicProperties(String dynamicPropertiesEncoded) { - if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { - Map properties = new HashMap<>(); - - String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); - for (String propLine : propertyLines) { - if (propLine == null) { - continue; - } - - String[] kv = propLine.split("="); - if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { - properties.put(kv[0], kv[1]); - } - } - return properties; - } - else { - return Collections.emptyMap(); - } - } - public static String getYarnPropertiesLocation(Configuration conf) { - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - - return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser; + /** +* Writes the given job manager address to the associated configuration object +* +* @param address Address to write to the configuration +* @param config The config to write to +*/ + public static void writeJobManagerAddressToConfig(Configuration config, InetSocketAddress address) { --- End diff -- How about renaming this to "setJobManagerAddressInConfig()" .. write implies that something is written to a file or something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66625318 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** -* Retrieves a {@link Client} object from the given command line options and other parameters. +* Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name -* @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); - jobManagerAddress = yarnCluster.getJobManagerAddress(); - writeJobManagerAddressToConfig(jobManagerAddress); - - // overwrite the yarn client config (because the client parses the dynamic properties) - this.config.addAll(flinkYarnClient.getFlinkConfiguration()); - - logAndSysout("YARN cluster started"); -
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2085#discussion_r66624415 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E } /** -* Retrieves a {@link Client} object from the given command line options and other parameters. +* Retrieves a {@link ClusterClient} object from the given command line options and other parameters. * * @param options Command line options which contain JobManager address * @param programName Program name -* @param userParallelism Given user parallelism * @throws Exception */ - protected Client getClient( + protected ClusterClient getClient( CommandLineOptions options, - String programName, - int userParallelism, - boolean detachedMode) - throws Exception { - InetSocketAddress jobManagerAddress; - int maxSlots = -1; + String programName) throws Exception { - if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) { - logAndSysout("YARN cluster mode detected. Switching Log4j output to console"); - - // Default yarn application name to use, if nothing is specified on the command line - String applicationName = "Flink Application: " + programName; - - // user wants to run Flink in YARN cluster. - CommandLine commandLine = options.getCommandLine(); - AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser - .getFlinkYarnSessionCli() - .withDefaultApplicationName(applicationName) - .createFlinkYarnClient(commandLine); - - if (flinkYarnClient == null) { - throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages"); - } - - // in case the main detached mode wasn't set, we don't wanna overwrite the one loaded - // from yarn options. - if (detachedMode) { - flinkYarnClient.setDetachedMode(true); - } - - // the number of slots available from YARN: - int yarnTmSlots = flinkYarnClient.getTaskManagerSlots(); - if (yarnTmSlots == -1) { - yarnTmSlots = 1; - } - maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount(); - if (userParallelism != -1) { - int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount(); - logAndSysout("The YARN cluster has " + maxSlots + " slots available, " + - "but the user requested a parallelism of " + userParallelism + " on YARN. " + - "Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " + - "will get "+slotsPerTM+" slots."); - flinkYarnClient.setTaskManagerSlots(slotsPerTM); - } - - try { - yarnCluster = flinkYarnClient.deploy(); - yarnCluster.connectToCluster(); - } - catch (Exception e) { - throw new RuntimeException("Error deploying the YARN cluster", e); - } + // Get the custom command-line (e.g. Standalone/Yarn/Mesos) + CustomCommandLine activeCommandLine = + CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress()); --- End diff -- Why is this method located in the Parser? I thought the parser is only responsible for parsing the command line arguments? Maybe it makes more sense to rename this ``` loadCustomCommandLine("org.apache.flink.client.cli.DefaultCLI"); loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", "yarn"); ``` to parser (loadCustomParser(), and DefaultCLIParser, YarnCliPa
[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2085 [FLINK-3937] programmatic resuming of clusters These changes are based on #1978 and #2034. More specifically, they port resuming of running Yarn clusters from #2034 to the refactoring of #1978. The abstraction in place enables us to plug in other cluster frameworks in the future. - integrates with and extends the refactoring of FLINK-3667 - enables to resume from Yarn properties or Yarn application id - introduces additional StandaloneClusterDescriptor - introduces DefaultCLI to get rid of standalone mode switches in CliFrontend - various fixes and improvements You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3937 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2085.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2085 commit b144e64758a95bcac33bd0ac91ab7eefaf4040e9 Author: Maximilian Michels Date: 2016-04-22T17:52:54Z [FLINK-3667] refactor client communication classes - ClusterDescriptor: base interface for cluster deployment descriptors - ClusterDescriptor: YarnClusterDescriptor - ClusterClient: base class for ClusterClients, handles lifecycle of cluster - ClusterClient: shares configuration with the implementations - ClusterClient: StandaloneClusterClient, YarnClusterClient - ClusterClient: remove run methods and enable detached mode via flag - CliFrontend: remove all Yarn specific logic - CliFrontend: remove all cluster setup logic - CustomCommandLine: interface for other cluster implementations - Customcommandline: enables creation of new cluster or resuming from existing - Yarn: move Yarn classes and functionality to the yarn module (yarn properties, yarn interfaces) - Yarn: improve reliability of cluster startup - Yarn Tests: only disable parallel execution of ITCases commit 73524c89854f04ac41f0c288d9ebf8ef5efe628b Author: Sebastian Klemke Date: 2016-05-25T12:28:59Z [FLINK-3937] implement -yid option to Flink CLI - enables to use list, savepoint, cancel and stop subcommands - adapt FlinkYarnSessionCli to also accept YARN application Id to attach to - update documentation commit 1db8c97c39c2bf071db018c1ca505409c847a30b Author: Maximilian Michels Date: 2016-06-01T10:45:52Z [FLINK-3863] Yarn Cluster shutdown may fail if leader changed recently commit 1a154fb12474a8630cce7e764d72398513055887 Author: Maximilian Michels Date: 2016-06-02T14:28:51Z [FLINK-3937] programmatic resuming of clusters - integrates with and extends the refactoring of FLINK-3667 - enables to resume from Yarn properties or Yarn application id - introduces additional StandaloneClusterDescriptor - introduces DefaultCLI to get rid of standalone mode switches in CliFrontend - various fixes and improvements --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---