[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2085


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r67018443
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
+
+   while(true) {
--- End diff --

I agree that the functionality is the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-14 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r67003363
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

The code no longer waits for TaskManagers when the cluster is resumed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66806299
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

Same as for `list` and `info`. I agree this verbosity is not very nice. 
Looking into how I can make this look nicer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805960
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() {
}
 
/**
-* @return -1 if unknown. The maximum number of available processing 
slots at the Flink cluster
-* connected to this client.
+* Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
+* @return The address (host and port) of the leading JobManager
 */
-   public int getMaxSlots() {
-   return this.maxSlots;
+   public InetSocketAddress getJobManagerAddressFromConfig() {
+   try {
+   String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+   return new InetSocketAddress(hostName, port);
--- End diff --

Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805953
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line 
interface.
+ */
+public interface CustomCommandLine {
+
+   /**
+* Returns a unique identifier for this custom command-line.
+* @return An unique identifier string
+*/
+   String getIdentifier();
+
+   /**
+* Adds custom options to the existing run options.
+* @param baseOptions The existing options.
+*/
+   void addRunOptions(Options baseOptions);
+
+   /**
+* Adds custom options to the existing general options.
+* @param baseOptions The existing options.
+*/
+   void addGeneralOptions(Options baseOptions);
+
+   /**
+* Retrieves a client for a running cluster
+* @param commandLine The command-line parameters from the CliFrontend
+* @param config The Flink config
+* @return Client if a cluster could be retrieve, null otherwise
--- End diff --

Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805706
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the 
YARN session using Flink.
 
 Use the YARN utilities (`yarn application -kill `) to stop the YARN 
session.
 
+ Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+ -id,--applicationId  YARN application Id
--- End diff --

Yes, same as above. Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805492
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
--- End diff --

Good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805499
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
+   }
+   return null;
--- End diff --

Same as above. Fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805479
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
+
+   while(true) {
--- End diff --

Yes, I wanted to bring up the Yarn session only once the cluster is ready. 
It is a semantic change but IMHO transparent to the user. There is not 
disadvantage from waiting until the cluster is ready. Ultimately, it would be 
nice to get rid of all waiting but we're not quite there yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805455
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -415,62 +513,83 @@ public int run(String[] args) {
}
System.out.println(description);
return 0;
+   } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM
--- End diff --

Sorry, added this TODO while I fixing some of your first comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805382
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
--- End diff --

Thank you. I fixed that by using a non-static variable for the logger and 
dynamically retrieving the class name, i.e.
```java
private final Logger LOG = LoggerFactory.getLogger(getClass());
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66805417
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Thank you, I'll look into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66801768
  
--- Diff: docs/setup/yarn_setup.md ---
@@ -143,6 +143,34 @@ Note that in this case its not possible to stop the 
YARN session using Flink.
 
 Use the YARN utilities (`yarn application -kill `) to stop the YARN 
session.
 
+ Attach to an existing Session
+
+Use the following command to start a session
+
+~~~bash
+./bin/yarn-session.sh
+~~~
+
+This command will show you the following overview:
+
+~~~bash
+Usage:
+   Required
+ -id,--applicationId  YARN application Id
--- End diff --

I think this option is also not listed when running ./bin/yarn-session.sh.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66800124
  
--- Diff: docs/apis/cli.md ---
@@ -252,6 +256,12 @@ Action "list" lists running and scheduled programs.
configuration.
  -r,--running  Show only running programs and their 
JobIDs
  -s,--scheduledShow only scheduled programs and their 
JobIDs
+  Additional arguments if -m yarn-cluster is set:
+ -yid   YARN application ID of Flink YARN 
session to
+   connect to. Must not be set if 
JobManager HA
+   is used. In this case, JobManager RPC
+   location is automatically retrieved from
+   Zookeeper.
--- End diff --

I tried this, but I wonder why its not logging that the job has been 
cancelled. 
It logs that "All TaskManagers are connected", I don't think this message 
is relevant when cancelling a job.

```
[cloudera@quickstart build-target]$  ./bin/flink cancel -m yarn-cluster 
-yid application_1447844011707_0038 b9b8f76616073d09c596545a3cda978f
2016-06-13 07:22:21,355 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 07:22:22,277 INFO  org.apache.flink.yarn.YarnClusterClient   
- Start application client.
2016-06-13 07:22:22,288 INFO  org.apache.flink.yarn.ApplicationClient   
- Notification about new leader address 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null.
2016-06-13 07:22:22,290 INFO  org.apache.flink.yarn.ApplicationClient   
- Received address of new leader 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager with session ID null.
2016-06-13 07:22:22,290 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager null.
Waiting until all TaskManagers have connected
2016-06-13 07:22:22,297 INFO  org.apache.flink.yarn.ApplicationClient   
- Trying to register at JobManager 
akka.tcp://flink@10.0.2.15:51747/user/jobmanager.
No status updates from the YARN cluster received so far. Waiting ...
2016-06-13 07:22:22,542 INFO  org.apache.flink.yarn.ApplicationClient   
- Successfully registered at the ResourceManager using 
JobManager Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764]
All TaskManagers are connected
2016-06-13 07:22:22,945 INFO  org.apache.flink.yarn.YarnClusterClient   
- Shutting down YarnClusterClient from the client shutdown hook
2016-06-13 07:22:22,945 INFO  org.apache.flink.yarn.YarnClusterClient   
- Disconnecting YarnClusterClient from ApplicationMaster
2016-06-13 07:22:22,947 INFO  org.apache.flink.yarn.ApplicationClient   
- Stopped Application client.
2016-06-13 07:22:22,947 INFO  org.apache.flink.yarn.ApplicationClient   
- Disconnect from JobManager 
Actor[akka.tcp://flink@10.0.2.15:51747/user/jobmanager#1733798764].
2016-06-13 07:22:23,056 INFO  org.apache.flink.yarn.YarnClusterClient   
- Application application_1447844011707_0038 finished with 
state RUNNING and final state UNDEFINED at 0
[cloudera@quickstart build-target]$ 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66799100
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Are there any tests for this functionality?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66797799
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
+   }
+   return null;
--- End diff --

Does it make sense to log that the config hasn't been set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66797580
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
+
+   while(true) {
--- End diff --

It seems that this loop is now executed also for the YARN session. In the 
past, this was used only for "per job" yarn clusters, because there we needed 
to have all slots available to submit the job.

Now, this loop is "blocking" the YARN session CLI if not all TaskManager's 
are connecting. Its not a big deal, but a change in the semantics.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66797696
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -300,18 +309,82 @@ public static boolean allocateResource(int[] 
nodeManagers, int toAllocate) {
return false;
}
 
-   @Override
public void setDetachedMode(boolean detachedMode) {
this.detached = detachedMode;
}
 
-   @Override
-   public boolean isDetached() {
+   public boolean isDetachedMode() {
return detached;
}
 
+
+   /**
+* Gets a Hadoop Yarn client
+* @return Returns a YarnClient which has to be shutdown manually
+*/
+   private static YarnClient getYarnClient(Configuration conf) {
+   YarnClient yarnClient = YarnClient.createYarnClient();
+   yarnClient.init(conf);
+   yarnClient.start();
+   return yarnClient;
+   }
+
+   /**
+* Retrieves the Yarn application and cluster from the config
+* @param config The config with entries to retrieve the cluster
+* @return YarnClusterClient
+* @deprecated This should be removed in the future
+*/
+   public YarnClusterClient 
retrieveFromConfig(org.apache.flink.configuration.Configuration config) throws  
Exception {
+   String jobManagerHost = 
config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int jobManagerPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+
+   if (jobManagerHost != null && jobManagerPort != -1) {
+
+   YarnClient yarnClient = getYarnClient(conf);
+   List applicationReports = 
yarnClient.getApplications();
+   for (ApplicationReport report : applicationReports) {
+   if (report.getHost().equals(jobManagerHost) && 
report.getRpcPort() == jobManagerPort) {
+   LOG.info("Found application '{}' " +
+   "with JobManager host name '{}' 
and port '{}' from Yarn properties file.",
+   report.getApplicationId(), 
jobManagerHost, jobManagerPort);
+   return 
retrieve(report.getApplicationId().toString());
+   }
+   }
+
--- End diff --

I wonder if it makes sense to log here that we were unable to find the 
application id?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66795439
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String 
longPrefix, boolean accept
NAME = new Option(shortPrefix + "nm", longPrefix + "name", 
true, "Set a custom name for the application on YARN");
}
 
+
/**
-* Creates a new Yarn Client.
-* @param cmd the command line to parse options from
-* @return an instance of the client or null if there was an error
+* Resumes from a Flink Yarn properties file
+* @param flinkConfiguration The flink configuration
+* @return True if the properties were loaded, false otherwise
 */
-   public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
+   private boolean resumeFromYarnProperties(Configuration 
flinkConfiguration) {
+   // load the YARN properties
+   File propertiesFile = new 
File(getYarnPropertiesLocation(flinkConfiguration));
+   if (!propertiesFile.exists()) {
+   return false;
+   }
+
+   logAndSysout("Found YARN properties file " + 
propertiesFile.getAbsolutePath());
+
+   Properties yarnProperties = new Properties();
+   try {
+   try (InputStream is = new 
FileInputStream(propertiesFile)) {
+   yarnProperties.load(is);
+   }
+   }
+   catch (IOException e) {
+   throw new RuntimeException("Cannot read the YARN 
properties file", e);
+   }
+
+   // configure the default parallelism from YARN
+   String propParallelism = 
yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
+   if (propParallelism != null) { // maybe the property is not set
+   try {
+   int parallelism = 
Integer.parseInt(propParallelism);
+   
flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 
parallelism);
+
+   logAndSysout("YARN properties set default 
parallelism to " + parallelism);
+   }
+   catch (NumberFormatException e) {
+   throw new RuntimeException("Error while parsing 
the YARN properties: " +
+   "Property " + 
YARN_PROPERTIES_PARALLELISM + " is not an integer.");
+   }
+   }
+
+   // get the JobManager address from the YARN properties
+   String address = 
yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
+   InetSocketAddress jobManagerAddress;
+   if (address != null) {
+   try {
+   jobManagerAddress = 
ClientUtils.parseHostPortAddress(address);
+   // store address in config from where it is 
retrieved by the retrieval service
+   
CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, 
jobManagerAddress);
+   }
+   catch (Exception e) {
+   throw new RuntimeException("YARN properties 
contain an invalid entry for JobManager address.", e);
+   }
+
+   logAndSysout("Using JobManager address from YARN 
properties " + jobManagerAddress);
+   }
 
-   AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-   if (flinkYarnClient == null) {
-   return null;
+   // handle the YARN client's dynamic properties
+   String dynamicPropertiesEncoded = 
yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
+   Map dynamicProperties = 
getDynamicProperties(dynamicPropertiesEncoded);
+   for (Map.Entry dynamicProperty : 
dynamicProperties.entrySet()) {
+   flinkConfiguration.setString(dynamicProperty.getKey(), 
dynamicProperty.getValue());
}
 
+   return true;
+   }
+
+   public YarnClusterDescriptor createDescriptor(String 
defaultApplicationName, CommandLine cmd) {
+
+
+   YarnClusterDescriptor yarnClusterDescriptor = new 
YarnClusterDescriptor();
--- End diff --

A bit too many blank lines ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or f

[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66795170
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -415,62 +513,83 @@ public int run(String[] args) {
}
System.out.println(description);
return 0;
+   } else if (cmd.hasOption(APPLICATION_ID.getOpt())) { // TODO RM
--- End diff --

What is this TODO about?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66794640
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

This is a similar thing:

```
[cloudera@quickstart build-target]$  ./bin/flink list -m yarn-cluster -yid 
obiouslyWrong -r


 The program finished with the following exception:

org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could 
not retrieve the leader gateway
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127)
at 
org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:844)
at org.apache.flink.client.CliFrontend.list(CliFrontend.java:378)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:986)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1034)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125)
... 4 more
[cloudera@quickstart build-target]$ 
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66794535
  
--- Diff: docs/apis/cli.md ---
@@ -105,6 +105,10 @@ The command line can be used to
 
 ./bin/flink list -r
 
+-   List running Flink jobs inside Flink YARN session:
+
+./bin/flink list -m yarn-cluster -yid  -r
--- End diff --

Why is the client trying to connect even though the application has been 
finished already?

```
[cloudera@quickstart build-target]$  ./bin/flink list -m yarn-cluster -yid 
application_1447844011707_0036 -r
2016-06-13 06:51:34,581 INFO  org.apache.hadoop.yarn.client.RMProxy 
- Connecting to ResourceManager at 
quickstart.cloudera/10.0.2.15:8032
2016-06-13 06:51:35,017 ERROR org.apache.flink.yarn.YarnClusterDescriptor   
- The application application_1447844011707_0036 doesn't run 
anymore. It has previously completed with final status: SUCCEEDED


 The program finished with the following exception:

org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could 
not retrieve the leader gateway
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:127)
at 
org.apache.flink.client.CliFrontend.getJobManagerGateway(CliFrontend.java:844)
at org.apache.flink.client.CliFrontend.list(CliFrontend.java:378)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:986)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1034)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
[1 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:125)
... 4 more
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66791210
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---
@@ -211,18 +185,45 @@ public void run() {
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
 
isConnected = true;
+
+   logAndSysout("Waiting until all TaskManagers have connected");
--- End diff --

Since the `logAndSysout` method is defined in the `ClusterClient`, all log 
messages appear from being from that class, even though they are coming from 
the `YarnClusterClient`. Maybe it makes sense to override the method or add an 
argument for the logger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66789723
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
@@ -181,11 +198,30 @@ public boolean getPrintStatusDuringExecution() {
}
 
/**
-* @return -1 if unknown. The maximum number of available processing 
slots at the Flink cluster
-* connected to this client.
+* Gets the current JobManager address from the Flink configuration 
(may change in case of a HA setup).
+* @return The address (host and port) of the leading JobManager
 */
-   public int getMaxSlots() {
-   return this.maxSlots;
+   public InetSocketAddress getJobManagerAddressFromConfig() {
+   try {
+   String hostName = 
flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
+   int port = 
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
+   return new InetSocketAddress(hostName, port);
--- End diff --

Missing indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66765910
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
}
 
/**
-* Retrieves a {@link Client} object from the given command line 
options and other parameters.
+* Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
 *
 * @param options Command line options which contain JobManager address
 * @param programName Program name
-* @param userParallelism Given user parallelism
 * @throws Exception
 */
-   protected Client getClient(
+   protected ClusterClient getClient(
CommandLineOptions options,
-   String programName,
-   int userParallelism,
-   boolean detachedMode)
-   throws Exception {
-   InetSocketAddress jobManagerAddress;
-   int maxSlots = -1;
+   String programName) throws Exception {
 
-   if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
-
-   // Default yarn application name to use, if nothing is 
specified on the command line
-   String applicationName = "Flink Application: " + 
programName;
-
-   // user wants to run Flink in YARN cluster.
-   CommandLine commandLine = options.getCommandLine();
-   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
-   
.getFlinkYarnSessionCli()
-   
.withDefaultApplicationName(applicationName)
-   
.createFlinkYarnClient(commandLine);
-
-   if (flinkYarnClient == null) {
-   throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
-   }
-
-   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
-   // from yarn options.
-   if (detachedMode) {
-   flinkYarnClient.setDetachedMode(true);
-   }
-
-   // the number of slots available from YARN:
-   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-   if (yarnTmSlots == -1) {
-   yarnTmSlots = 1;
-   }
-   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-   if (userParallelism != -1) {
-   int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-   logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
-   "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
-   "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-   "will get "+slotsPerTM+" 
slots.");
-   flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-   }
-
-   try {
-   yarnCluster = flinkYarnClient.deploy();
-   yarnCluster.connectToCluster();
-   }
-   catch (Exception e) {
-   throw new RuntimeException("Error deploying the 
YARN cluster", e);
-   }
+   // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+   CustomCommandLine activeCommandLine =
+   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
--- End diff --

Okay, moving the logic is addressing my concerns.

I checked the classes again and my renaming suggestions don't make much 
sense ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file

[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66628410
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
}
 
/**
-* Retrieves a {@link Client} object from the given command line 
options and other parameters.
+* Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
 *
 * @param options Command line options which contain JobManager address
 * @param programName Program name
-* @param userParallelism Given user parallelism
 * @throws Exception
 */
-   protected Client getClient(
+   protected ClusterClient getClient(
CommandLineOptions options,
-   String programName,
-   int userParallelism,
-   boolean detachedMode)
-   throws Exception {
-   InetSocketAddress jobManagerAddress;
-   int maxSlots = -1;
+   String programName) throws Exception {
 
-   if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
-
-   // Default yarn application name to use, if nothing is 
specified on the command line
-   String applicationName = "Flink Application: " + 
programName;
-
-   // user wants to run Flink in YARN cluster.
-   CommandLine commandLine = options.getCommandLine();
-   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
-   
.getFlinkYarnSessionCli()
-   
.withDefaultApplicationName(applicationName)
-   
.createFlinkYarnClient(commandLine);
-
-   if (flinkYarnClient == null) {
-   throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
-   }
-
-   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
-   // from yarn options.
-   if (detachedMode) {
-   flinkYarnClient.setDetachedMode(true);
-   }
-
-   // the number of slots available from YARN:
-   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-   if (yarnTmSlots == -1) {
-   yarnTmSlots = 1;
-   }
-   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-   if (userParallelism != -1) {
-   int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-   logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
-   "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
-   "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-   "will get "+slotsPerTM+" 
slots.");
-   flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-   }
-
-   try {
-   yarnCluster = flinkYarnClient.deploy();
-   yarnCluster.connectToCluster();
-   }
-   catch (Exception e) {
-   throw new RuntimeException("Error deploying the 
YARN cluster", e);
-   }
+   // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+   CustomCommandLine activeCommandLine =
+   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
--- End diff --

Well spotted. I think I'll move this logic to the implementation of the 
CustomCommandLine. 

I don't quite understand your renaming suggestion, are you suggesting to 
break up the CustomCommandLine into CustomParser and CustomCLI?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature 

[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66626302
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
}
 
/**
-* Retrieves a {@link Client} object from the given command line 
options and other parameters.
+* Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
 *
 * @param options Command line options which contain JobManager address
 * @param programName Program name
-* @param userParallelism Given user parallelism
 * @throws Exception
 */
-   protected Client getClient(
+   protected ClusterClient getClient(
CommandLineOptions options,
-   String programName,
-   int userParallelism,
-   boolean detachedMode)
-   throws Exception {
-   InetSocketAddress jobManagerAddress;
-   int maxSlots = -1;
+   String programName) throws Exception {
 
-   if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
-
-   // Default yarn application name to use, if nothing is 
specified on the command line
-   String applicationName = "Flink Application: " + 
programName;
-
-   // user wants to run Flink in YARN cluster.
-   CommandLine commandLine = options.getCommandLine();
-   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
-   
.getFlinkYarnSessionCli()
-   
.withDefaultApplicationName(applicationName)
-   
.createFlinkYarnClient(commandLine);
-
-   if (flinkYarnClient == null) {
-   throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
-   }
-
-   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
-   // from yarn options.
-   if (detachedMode) {
-   flinkYarnClient.setDetachedMode(true);
-   }
-
-   // the number of slots available from YARN:
-   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-   if (yarnTmSlots == -1) {
-   yarnTmSlots = 1;
-   }
-   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-   if (userParallelism != -1) {
-   int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-   logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
-   "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
-   "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-   "will get "+slotsPerTM+" 
slots.");
-   flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-   }
-
-   try {
-   yarnCluster = flinkYarnClient.deploy();
-   yarnCluster.connectToCluster();
-   }
-   catch (Exception e) {
-   throw new RuntimeException("Error deploying the 
YARN cluster", e);
-   }
+   // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+   CustomCommandLine activeCommandLine =
+   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
 
-   jobManagerAddress = yarnCluster.getJobManagerAddress();
-   writeJobManagerAddressToConfig(jobManagerAddress);
-   
-   // overwrite the yarn client config (because the client 
parses the dynamic properties)
-   
this.config.addAll(flinkYarnClient.getFlinkConfiguration());
-
-   logAndSysout("YARN cluster started");
-  

[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66626078
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -1275,33 +1071,16 @@ else if (new 
File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
return location;
}
 
-   public static Map getDynamicProperties(String 
dynamicPropertiesEncoded) {
-   if (dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
-   Map properties = new HashMap<>();
-   
-   String[] propertyLines = 
dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-   for (String propLine : propertyLines) {
-   if (propLine == null) {
-   continue;
-   }
-   
-   String[] kv = propLine.split("=");
-   if (kv.length >= 2 && kv[0] != null && kv[1] != 
null && kv[0].length() > 0) {
-   properties.put(kv[0], kv[1]);
-   }
-   }
-   return properties;
-   }
-   else {
-   return Collections.emptyMap();
-   }
-   }
 
-   public static String getYarnPropertiesLocation(Configuration conf) {
-   String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
-   String currentUser = System.getProperty("user.name");
-   String propertiesFileLocation = 
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
-
-   return propertiesFileLocation + File.separator + 
CliFrontend.YARN_PROPERTIES_FILE + currentUser;
+   /**
+* Writes the given job manager address to the associated configuration 
object
+*
+* @param address Address to write to the configuration
+* @param config The config to write to
+*/
+   public static void writeJobManagerAddressToConfig(Configuration config, 
InetSocketAddress address) {
--- End diff --

+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66625805
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+
+
+/**
+ * Custom command-line interface to load hooks for the command-line 
interface.
+ */
+public interface CustomCommandLine {
+
+   /**
+* Returns a unique identifier for this custom command-line.
+* @return An unique identifier string
+*/
+   String getIdentifier();
+
+   /**
+* Adds custom options to the existing run options.
+* @param baseOptions The existing options.
+*/
+   void addRunOptions(Options baseOptions);
+
+   /**
+* Adds custom options to the existing general options.
+* @param baseOptions The existing options.
+*/
+   void addGeneralOptions(Options baseOptions);
+
+   /**
+* Retrieves a client for a running cluster
+* @param commandLine The command-line parameters from the CliFrontend
+* @param config The Flink config
+* @return Client if a cluster could be retrieve, null otherwise
--- End diff --

typo: retrieved


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66625502
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -1275,33 +1071,16 @@ else if (new 
File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
return location;
}
 
-   public static Map getDynamicProperties(String 
dynamicPropertiesEncoded) {
-   if (dynamicPropertiesEncoded != null && 
dynamicPropertiesEncoded.length() > 0) {
-   Map properties = new HashMap<>();
-   
-   String[] propertyLines = 
dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
-   for (String propLine : propertyLines) {
-   if (propLine == null) {
-   continue;
-   }
-   
-   String[] kv = propLine.split("=");
-   if (kv.length >= 2 && kv[0] != null && kv[1] != 
null && kv[0].length() > 0) {
-   properties.put(kv[0], kv[1]);
-   }
-   }
-   return properties;
-   }
-   else {
-   return Collections.emptyMap();
-   }
-   }
 
-   public static String getYarnPropertiesLocation(Configuration conf) {
-   String defaultPropertiesFileLocation = 
System.getProperty("java.io.tmpdir");
-   String currentUser = System.getProperty("user.name");
-   String propertiesFileLocation = 
conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, 
defaultPropertiesFileLocation);
-
-   return propertiesFileLocation + File.separator + 
CliFrontend.YARN_PROPERTIES_FILE + currentUser;
+   /**
+* Writes the given job manager address to the associated configuration 
object
+*
+* @param address Address to write to the configuration
+* @param config The config to write to
+*/
+   public static void writeJobManagerAddressToConfig(Configuration config, 
InetSocketAddress address) {
--- End diff --

How about renaming this to "setJobManagerAddressInConfig()" .. write 
implies that something is written to a file or something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66625318
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
}
 
/**
-* Retrieves a {@link Client} object from the given command line 
options and other parameters.
+* Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
 *
 * @param options Command line options which contain JobManager address
 * @param programName Program name
-* @param userParallelism Given user parallelism
 * @throws Exception
 */
-   protected Client getClient(
+   protected ClusterClient getClient(
CommandLineOptions options,
-   String programName,
-   int userParallelism,
-   boolean detachedMode)
-   throws Exception {
-   InetSocketAddress jobManagerAddress;
-   int maxSlots = -1;
+   String programName) throws Exception {
 
-   if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
-
-   // Default yarn application name to use, if nothing is 
specified on the command line
-   String applicationName = "Flink Application: " + 
programName;
-
-   // user wants to run Flink in YARN cluster.
-   CommandLine commandLine = options.getCommandLine();
-   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
-   
.getFlinkYarnSessionCli()
-   
.withDefaultApplicationName(applicationName)
-   
.createFlinkYarnClient(commandLine);
-
-   if (flinkYarnClient == null) {
-   throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
-   }
-
-   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
-   // from yarn options.
-   if (detachedMode) {
-   flinkYarnClient.setDetachedMode(true);
-   }
-
-   // the number of slots available from YARN:
-   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-   if (yarnTmSlots == -1) {
-   yarnTmSlots = 1;
-   }
-   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-   if (userParallelism != -1) {
-   int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-   logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
-   "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
-   "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-   "will get "+slotsPerTM+" 
slots.");
-   flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-   }
-
-   try {
-   yarnCluster = flinkYarnClient.deploy();
-   yarnCluster.connectToCluster();
-   }
-   catch (Exception e) {
-   throw new RuntimeException("Error deploying the 
YARN cluster", e);
-   }
+   // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+   CustomCommandLine activeCommandLine =
+   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
 
-   jobManagerAddress = yarnCluster.getJobManagerAddress();
-   writeJobManagerAddressToConfig(jobManagerAddress);
-   
-   // overwrite the yarn client config (because the client 
parses the dynamic properties)
-   
this.config.addAll(flinkYarnClient.getFlinkConfiguration());
-
-   logAndSysout("YARN cluster started");
- 

[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-10 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2085#discussion_r66624415
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -980,110 +845,41 @@ protected ActorGateway 
getJobManagerGateway(CommandLineOptions options) throws E
}
 
/**
-* Retrieves a {@link Client} object from the given command line 
options and other parameters.
+* Retrieves a {@link ClusterClient} object from the given command line 
options and other parameters.
 *
 * @param options Command line options which contain JobManager address
 * @param programName Program name
-* @param userParallelism Given user parallelism
 * @throws Exception
 */
-   protected Client getClient(
+   protected ClusterClient getClient(
CommandLineOptions options,
-   String programName,
-   int userParallelism,
-   boolean detachedMode)
-   throws Exception {
-   InetSocketAddress jobManagerAddress;
-   int maxSlots = -1;
+   String programName) throws Exception {
 
-   if 
(YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
-   logAndSysout("YARN cluster mode detected. Switching 
Log4j output to console");
-
-   // Default yarn application name to use, if nothing is 
specified on the command line
-   String applicationName = "Flink Application: " + 
programName;
-
-   // user wants to run Flink in YARN cluster.
-   CommandLine commandLine = options.getCommandLine();
-   AbstractFlinkYarnClient flinkYarnClient = 
CliFrontendParser
-   
.getFlinkYarnSessionCli()
-   
.withDefaultApplicationName(applicationName)
-   
.createFlinkYarnClient(commandLine);
-
-   if (flinkYarnClient == null) {
-   throw new RuntimeException("Unable to create 
Flink YARN Client. Check previous log messages");
-   }
-
-   // in case the main detached mode wasn't set, we don't 
wanna overwrite the one loaded
-   // from yarn options.
-   if (detachedMode) {
-   flinkYarnClient.setDetachedMode(true);
-   }
-
-   // the number of slots available from YARN:
-   int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
-   if (yarnTmSlots == -1) {
-   yarnTmSlots = 1;
-   }
-   maxSlots = yarnTmSlots * 
flinkYarnClient.getTaskManagerCount();
-   if (userParallelism != -1) {
-   int slotsPerTM = userParallelism / 
flinkYarnClient.getTaskManagerCount();
-   logAndSysout("The YARN cluster has " + maxSlots 
+ " slots available, " +
-   "but the user requested a 
parallelism of " + userParallelism + " on YARN. " +
-   "Each of the " + 
flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
-   "will get "+slotsPerTM+" 
slots.");
-   flinkYarnClient.setTaskManagerSlots(slotsPerTM);
-   }
-
-   try {
-   yarnCluster = flinkYarnClient.deploy();
-   yarnCluster.connectToCluster();
-   }
-   catch (Exception e) {
-   throw new RuntimeException("Error deploying the 
YARN cluster", e);
-   }
+   // Get the custom command-line (e.g. Standalone/Yarn/Mesos)
+   CustomCommandLine activeCommandLine =
+   
CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
--- End diff --

Why is this method located in the Parser?
I thought the parser is only responsible for parsing the command line 
arguments?

Maybe it makes more sense to rename this 
```
loadCustomCommandLine("org.apache.flink.client.cli.DefaultCLI");

loadCustomCommandLine("org.apache.flink.yarn.cli.FlinkYarnSessionCli", "y", 
"yarn");
```
to parser (loadCustomParser(), and DefaultCLIParser, YarnCliPa

[GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters

2016-06-08 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2085

[FLINK-3937] programmatic resuming of clusters

These changes are based on #1978 and #2034. More specifically, they port 
resuming of running Yarn clusters from #2034 to the refactoring of #1978. The 
abstraction in place enables us to plug in other cluster frameworks in the 
future. 

- integrates with and extends the refactoring of FLINK-3667
- enables to resume from Yarn properties or Yarn application id
- introduces additional StandaloneClusterDescriptor
- introduces DefaultCLI to get rid of standalone mode switches in 
CliFrontend
- various fixes and improvements

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3937

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2085.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2085


commit b144e64758a95bcac33bd0ac91ab7eefaf4040e9
Author: Maximilian Michels 
Date:   2016-04-22T17:52:54Z

[FLINK-3667] refactor client communication classes

- ClusterDescriptor: base interface for cluster deployment descriptors
- ClusterDescriptor: YarnClusterDescriptor

- ClusterClient: base class for ClusterClients, handles lifecycle of cluster
- ClusterClient: shares configuration with the implementations
- ClusterClient: StandaloneClusterClient, YarnClusterClient
- ClusterClient: remove run methods and enable detached mode via flag

- CliFrontend: remove all Yarn specific logic
- CliFrontend: remove all cluster setup logic

- CustomCommandLine: interface for other cluster implementations
- Customcommandline: enables creation of new cluster or resuming from 
existing

- Yarn: move Yarn classes and functionality to the yarn module (yarn
  properties, yarn interfaces)
- Yarn: improve reliability of cluster startup
- Yarn Tests: only disable parallel execution of ITCases

commit 73524c89854f04ac41f0c288d9ebf8ef5efe628b
Author: Sebastian Klemke 
Date:   2016-05-25T12:28:59Z

[FLINK-3937] implement -yid option to Flink CLI

- enables to use list, savepoint, cancel and stop subcommands
- adapt FlinkYarnSessionCli to also accept YARN application Id to attach to
- update documentation

commit 1db8c97c39c2bf071db018c1ca505409c847a30b
Author: Maximilian Michels 
Date:   2016-06-01T10:45:52Z

[FLINK-3863] Yarn Cluster shutdown may fail if leader changed recently

commit 1a154fb12474a8630cce7e764d72398513055887
Author: Maximilian Michels 
Date:   2016-06-02T14:28:51Z

[FLINK-3937] programmatic resuming of clusters

- integrates with and extends the refactoring of FLINK-3667
- enables to resume from Yarn properties or Yarn application id
- introduces additional StandaloneClusterDescriptor
- introduces DefaultCLI to get rid of standalone mode switches in 
CliFrontend
- various fixes and improvements




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---