[
https://issues.apache.org/jira/browse/FLINK-3667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15288767#comment-15288767
]
ASF GitHub Bot commented on FLINK-3667:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/1978#discussion_r63681528
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,14 +191,41 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
isConnected = true;
+
+ logAndSysout("Waiting until all TaskManagers have connected");
+
+ while(true) {
+ GetClusterStatusResponse status = getClusterStatus();
+ if (status != null) {
+ if (status.numRegisteredTaskManagers() <
clusterDescriptor.getTaskManagerCount()) {
+ logAndSysout("TaskManager status (" +
status.numRegisteredTaskManagers() + "/"
+ +
clusterDescriptor.getTaskManagerCount() + ")");
+ } else {
+ logAndSysout("All TaskManagers are
connected");
+ break;
+ }
+ } else {
+ logAndSysout("No status updates from the YARN
cluster received so far. Waiting ...");
+ }
+
+ try {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for
TaskManagers");
+ System.err.println("Thread is interrupted");
+ Thread.currentThread().interrupt();
--- End diff --
I guess I wrote this code. If I'm not mistaken, this means that the
while(true) loop is not interruptable, right?
I think we should change that and break out of the loop if it has been
interrupted.
What do you think?
> Generalize client<->cluster communication
> -----------------------------------------
>
> Key: FLINK-3667
> URL: https://issues.apache.org/jira/browse/FLINK-3667
> Project: Flink
> Issue Type: Improvement
> Components: YARN Client
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
>
> Here are some notes I took when inspecting the client<->cluster classes with
> regard to future integration of other resource management frameworks in
> addition to Yarn (e.g. Mesos).
> {noformat}
> 1 Cluster Client Abstraction
> ════════════════════════════
> 1.1 Status Quo
> ──────────────
> 1.1.1 FlinkYarnClient
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Holds the cluster configuration (Flink-specific and Yarn-specific)
> • Contains the deploy() method to deploy the cluster
> • Creates the Hadoop Yarn client
> • Receives the initial job manager address
> • Bootstraps the FlinkYarnCluster
> 1.1.2 FlinkYarnCluster
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Wrapper around the Hadoop Yarn client
> • Queries cluster for status updates
> • Life time methods to start and shutdown the cluster
> • Flink specific features like shutdown after job completion
> 1.1.3 ApplicationClient
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Acts as a middle-man for asynchronous cluster communication
> • Designed to communicate with Yarn, not used in Standalone mode
> 1.1.4 CliFrontend
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Deeply integrated with FlinkYarnClient and FlinkYarnCluster
> • Constantly distinguishes between Yarn and Standalone mode
> • Would be nice to have a general abstraction in place
> 1.1.5 Client
> ╌╌╌╌╌╌╌╌╌╌╌╌
> • Job submission and Job related actions, agnostic of resource framework
> 1.2 Proposal
> ────────────
> 1.2.1 ClusterConfig (before: AbstractFlinkYarnClient)
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Extensible cluster-agnostic config
> • May be extended by specific cluster, e.g. YarnClusterConfig
> 1.2.2 ClusterClient (before: AbstractFlinkYarnClient)
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Deals with cluster (RM) specific communication
> • Exposes framework agnostic information
> • YarnClusterClient, MesosClusterClient, StandaloneClusterClient
> 1.2.3 FlinkCluster (before: AbstractFlinkYarnCluster)
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Basic interface to communicate with a running cluster
> • Receives the ClusterClient for cluster-specific communication
> • Should not have to care about the specific implementations of the
> client
> 1.2.4 ApplicationClient
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • Can be changed to work cluster-agnostic (first steps already in
> FLINK-3543)
> 1.2.5 CliFrontend
> ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌
> • CliFrontend does never have to differentiate between different
> cluster types after it has determined which cluster class to load.
> • Base class handles framework agnostic command line arguments
> • Pluggables for Yarn, Mesos handle specific commands
> {noformat}
> I would like to create/refactor the affected classes to set us up for a more
> flexible client side resource management abstraction.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)