[GitHub] flink issue #2347: [FLINK-4236] fix error handling for jar files with no mai...

2016-08-13 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2347
  
Merging this after a Checkstyle fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-17 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75122721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   BlobCache blobClient = null;
+   try {
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+ 

[GitHub] flink pull request #2340: [FLINK-3155] Update docker flink container to the ...

2016-08-17 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2340#discussion_r75123833
  
--- Diff: flink-contrib/docker-flink/Dockerfile ---
@@ -22,25 +22,31 @@ FROM java:8-jre-alpine
 RUN apk add --no-cache bash snappy
 
 # Configure Flink version
-ARG FLINK_VERSION=1.0.3
+ARG FLINK_VERSION=1.1.1
 ARG HADOOP_VERSION=27
 ARG SCALA_VERSION=2.11
 
+# Flink environment variables
+ARG FLINK_INSTALL_PATH=/opt
+ENV FLINK_HOME $FLINK_INSTALL_PATH/flink
+ENV PATH $PATH:$FLINK_HOME/bin
+
 # Install build dependencies and flink
 RUN set -x && \
+  mkdir -p $FLINK_INSTALL_PATH && \
   apk --update add --virtual build-dependencies curl && \
-  curl -s $(curl -s https://www.apache.org/dyn/closer.cgi\?as_json\=1 | \
-  awk '/preferred/ {gsub(/"/,""); print 
$2}')flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz
 | \
-  tar xvz -C /usr/local/ && \
-  ln -s /usr/local/flink-$FLINK_VERSION /usr/local/flink && \
-  sed -i -e "s/echo \$mypid >> \$pid/echo \$mypid >> \$pid \&\& wait/g" 
/usr/local/flink/bin/flink-daemon.sh && \
+  curl -s $(curl -s 
https://www.apache.org/dyn/closer.cgi\?preferred\=true)flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz
 | \
--- End diff --

Much better! Thanks :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2340: [FLINK-3155] Update docker flink container to the latest ...

2016-08-17 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2340
  
Thanks for addressing my comments! Looks very good. I'm sorry I asked 
whether you have tested the changes. It may seem natural to you but trust me 
that is not always the case :)

Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-17 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75123576
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
--- End diff --

That's not possible because the `JobClientActor` will complete this future 
with the result of the job execution which may be be infinitely delayed. In all 
other cases (i.e. timeout to register at jobmanager, failure to attach to job, 
failure to submit job), the `JobClientActor` will complete the future with a 
failure message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2317: [FLINK-4287] Ensure the yarn-session.sh classpath contain...

2016-08-17 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2317
  
Woops. Didn't end up merging #2320 so this got overlooked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2317: [FLINK-4287] Ensure the yarn-session.sh classpath ...

2016-08-17 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2317#discussion_r75126672
  
--- Diff: flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh ---
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
 
 export FLINK_CONF_DIR
 
-$JAVA_RUN $JVM_ARGS -classpath 
$CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR  $log_setting 
org.apache.flink.yarn.cli.FlinkYarnSessionCli -j $FLINK_LIB_DIR/flink-dist*.jar 
"$@"
--- End diff --

The name was changed recently (some months ago). Niels was just on an old 
base.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2317: [FLINK-4287] Ensure the yarn-session.sh classpath contain...

2016-08-17 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2317
  
Rebased and ready to go. Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2114: FLINK-3839 Added the extraction of jar files from a URL s...

2016-08-17 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2114
  
Hi @thormanrd! Thanks for your PR. As Robert and Stephan already pointed 
out, it contains a lot of reformats which are generally encouraged. They make 
the pull request's essentials hard to review and can even introduce subtle 
bugs. 

The prefered way of syncing up with the master is to rebased against the 
master before you open a pull request:

```
git pull --rebase upstream master
``` 
`upstream` is the main Flink repository. Maybe it's called `origin` in your 
case. Check using `git remote -v`.

After the pull request has been opened you should only push additional 
commits (no rebasing) to keep the comments and discussion. In the end, either 
you or the committer rebases again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75269835
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

We always build yarn. We use the `include-yarn-tests` profile to 
include/exclude yarn tests. The `include-yarn` profile, on the other hand, it 
to exclude yarn for the Haodop 1 version of Flink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75269975
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
+   ..
+   
+   
+   flink-mesos_2.10
+   flink-mesos
+   jar
+
+
+0.27.1
+
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   
+   
+   hadoop-core
--- End diff --

Why do you exclude just `hadoop-core` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75270753
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
--- End diff --

This looks just like a dummy/stub class? Not a CLI yet :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75271531
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
--- End diff --

This is always 0.0 which means give me whatever is free?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75271636
  
--- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos;
+
+import org.apache.mesos.Protos;
+
+import java.net.URL;
+import java.util.Arrays;
+
+public class Utils {
+   /**
+* Construct a Mesos environment variable.
+ */
--- End diff --

indention off here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75272075
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
+   }
+
+   @Override
+   public int getPorts() {
+   return TM_PORT_KEYS.length;
+   }
+
+   @Override
+   public Map 
getCustomNamedResources() {
+   return Collections.emptyMap();
+   }
+
+   @Override
+   public List getHardConstraints() 
{
+   return null;
+   }
+
+   @

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75272248
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
--- End diff --

I would rather throw an exception here if the value is not in use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75274022
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75275043
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,294 @@
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+   4.0.0
+
+   
+   org.apache.flink
+   flink-parent
+   1.1-SNAPSHOT
--- End diff --

This needs to be bumped to `1.2-SNAPSHOT`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75275735
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+/**
+ * The Mesos environment variables used for settings of the containers.
+ */
+public class MesosConfigKeys {
--- End diff --

I wonder, would it make sense to create `ContainerEnvConfigKeys` with the 
shared environment variables in `YarnConfigKeys` and `MesosConfigKeys`? The 
overlap is quite quite significant. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75276016
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75276925
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75279529
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75280315
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75281657
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework
+
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore
+import org.apache.flink.runtime.clusterframework.types.{ResourceID, 
ResourceIDRetrievable}
+
+/**
+  * A representation of a registered Mesos task managed by the {@link 
MesosFlinkResourceManager}.
+  */
+case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) 
extends ResourceIDRetrievable {
--- End diff --

Why is this class written in Scala? It seems like this class is only used 
from Java code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75282003
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
--- End diff --

Per definition, variables are static and final in interfaces :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75282731
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A standalone Mesos worker store.
+ */
+public class StandaloneMesosWorkerStore implements MesosWorkerStore {
+
+   private Option frameworkID = Option.empty();
+
+   private int taskCount = 0;
+
+   private Map storedWorkers = new 
LinkedHashMap<>();
+
+   public StandaloneMesosWorkerStore() {
+   }
+
+   @Override
+   public void start() throws Exception {
+
+   }
+
+   @Override
+   public void stop() throws Exception {
+
+   }
+
+   @Override
+   public Option getFrameworkID() throws Exception {
+   return frameworkID;
+   }
+
+   @Override
+   public void setFrameworkID(Option frameworkID) 
throws Exception {
+   this.frameworkID = frameworkID;
+   }
+
+   @Override
+   public List recoverWorkers() throws Exception {
+   return ImmutableList.copyOf(storedWorkers.values());
+   }
+
+   @Override
+   public Protos.TaskID newTaskID() throws Exception {
+   Protos.TaskID taskID = 
Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(++taskCount)).build();
+   return taskID;
--- End diff --

Could be simplified: `return 
Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(++taskCount)).build();`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75283539
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.framework.recipes.shared.SharedValue;
+import org.apache.curator.framework.recipes.shared.VersionedValue;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
+import org.apache.mesos.Protos;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A ZooKeeper-backed Mesos worker store.
+ */
+public class ZooKeeperMesosWorkerStore implements MesosWorkerStore {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class);
+
+   private final Object cacheLock = new Object();
--- End diff --

Seems like the store should only be accessed by the ResourceManager. In 
this case we could remove the lock.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75283689
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
+
+   public Worker(Protos.TaskID taskID, Option 
slaveID, Option hostname, TaskState state) {
+   requireNonNull(taskID, "taskID");
+   requireNonNull(slaveID, "slaveID");
+   requireNonNull(hostname, "hostname");
+   requireNonNull(state, "state");
+
+   this.taskID = taskID;
+   this.slaveID = slaveID;
+   this.hostname = hostname;
+   this.state = state;
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   public Option slaveID() {
+   return slaveID;
+   }
+
+   public Option hostname() {
+   return hostname;
+   }
+
+   public TaskState state() {
+   return state;
+   }
+
+   // valid transition methods
--- End diff --

Could you frame the transition methods with comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75283918
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework.store;
+
+import org.apache.mesos.Protos;
+import scala.Option;
+
+import java.io.Serializable;
+import java.text.DecimalFormat;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A store of Mesos workers and associated framework information.
+ *
+ * Generates a framework ID as necessary.
+ */
+public interface MesosWorkerStore {
+
+   static final DecimalFormat TASKID_FORMAT = new 
DecimalFormat("taskmanager-0");
+
+   void start() throws Exception;
+
+   void stop() throws Exception;
+
+   Option getFrameworkID() throws Exception;
+
+   void setFrameworkID(Option frameworkID) throws 
Exception;
+
+   List recoverWorkers() throws Exception;
+
+   Protos.TaskID newTaskID() throws Exception;
+
+   void putWorker(Worker worker) throws Exception;
+
+   void removeWorker(Protos.TaskID taskID) throws Exception;
+
+   void cleanup() throws Exception;
+
+   /**
+* A stored task.
+*
+* The assigned slaveid/hostname is valid in Launched and Released 
states.  The hostname is needed
+* by Fenzo for optimization purposes.
+*/
+   class Worker implements Serializable {
+   private Protos.TaskID taskID;
+
+   private Option slaveID;
+
+   private Option hostname;
+
+   private TaskState state;
+
+   public Worker(Protos.TaskID taskID, Option 
slaveID, Option hostname, TaskState state) {
+   requireNonNull(taskID, "taskID");
+   requireNonNull(slaveID, "slaveID");
+   requireNonNull(hostname, "hostname");
+   requireNonNull(state, "state");
+
+   this.taskID = taskID;
+   this.slaveID = slaveID;
+   this.hostname = hostname;
+   this.state = state;
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   public Option slaveID() {
+   return slaveID;
+   }
+
+   public Option hostname() {
+   return hostname;
+   }
+
+   public TaskState state() {
+   return state;
+   }
+
+   // valid transition methods
+
+   public static Worker newTask(Protos.TaskID taskID) {
+   return new Worker(
+   taskID,
+   Option.empty(), 
Option.empty(),
+   TaskState.New);
+   }
+
+   public Worker launchTask(Protos.SlaveID slaveID, String 
hostname) {
+   return new Worker(taskID, Option.apply(slaveID), 
Option.apply(hostname), TaskState.Launched);
+   }
+
+   public Worker releaseTask() {
+   return new Worker(taskID, slaveID, hostname, 
TaskState.Released);
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+   Worker worker = (Worker) o;
+   return Objects.equals(taskID, worker.taskI

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75284993
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.List;
+
+/**
+ * This class reacts to callbacks from the Mesos scheduler driver.
+ *
+ * In order to preserve actor concurrency safety, this class simply sends
+ * corresponding messages to the Mesos resource master actor.
+ *
+ * See 
https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ */
+public class SchedulerProxy implements Scheduler {
+
+   /** The actor to which we report the callbacks */
+   private ActorRef mesosActor;
--- End diff --

The `MesosActor` is actually the `MesosFlinkResourceManager`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75285550
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.List;
+
+/**
+ * This class reacts to callbacks from the Mesos scheduler driver.
+ *
+ * In order to preserve actor concurrency safety, this class simply sends
+ * corresponding messages to the Mesos resource master actor.
+ *
+ * See 
https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ */
+public class SchedulerProxy implements Scheduler {
+
+   /** The actor to which we report the callbacks */
+   private ActorRef mesosActor;
+
+   public SchedulerProxy(ActorRef mesosActor) {
+   this.mesosActor = mesosActor;
+   }
+
+   @Override
+   public void registered(SchedulerDriver driver, Protos.FrameworkID 
frameworkId, Protos.MasterInfo masterInfo) {
+   mesosActor.tell(new Registered(frameworkId, masterInfo), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void reregistered(SchedulerDriver driver, Protos.MasterInfo 
masterInfo) {
+   mesosActor.tell(new ReRegistered(masterInfo), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void disconnected(SchedulerDriver driver) {
+   mesosActor.tell(new Disconnected(), ActorRef.noSender());
+   }
+
+
+   @Override
+   public void resourceOffers(SchedulerDriver driver, List 
offers) {
+   mesosActor.tell(new ResourceOffers(offers), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void offerRescinded(SchedulerDriver driver, Protos.OfferID 
offerId) {
+   mesosActor.tell(new OfferRescinded(offerId), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus 
status) {
+   mesosActor.tell(new StatusUpdate(status), ActorRef.noSender());
+   }
+
+   @Override
+   public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID 
executorId, Protos.SlaveID slaveId, byte[] data) {
+   throw new UnsupportedOperationException("frameworkMessage is 
unexpected");
+   }
+
+   @Override
+   public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) {
+   mesosActor.tell(new SlaveLost(slaveId), ActorRef.noSender());
+   }
+
+   @Override
+   public void executorLost(SchedulerDriver driver, Protos.ExecutorID 
executorId, Protos.SlaveID slaveId, int status) {
+   throw new UnsupportedOperationException("executorLost is 
unexpected");
--- End diff --

Why don't we forward this message and crash the actor instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75285623
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java 
---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import akka.actor.ActorRef;
+
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.SlaveLost;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Scheduler;
+import org.apache.mesos.SchedulerDriver;
+
+import java.util.List;
+
+/**
+ * This class reacts to callbacks from the Mesos scheduler driver.
+ *
+ * In order to preserve actor concurrency safety, this class simply sends
+ * corresponding messages to the Mesos resource master actor.
+ *
+ * See 
https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html
+ */
+public class SchedulerProxy implements Scheduler {
+
+   /** The actor to which we report the callbacks */
+   private ActorRef mesosActor;
+
+   public SchedulerProxy(ActorRef mesosActor) {
+   this.mesosActor = mesosActor;
+   }
+
+   @Override
+   public void registered(SchedulerDriver driver, Protos.FrameworkID 
frameworkId, Protos.MasterInfo masterInfo) {
+   mesosActor.tell(new Registered(frameworkId, masterInfo), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void reregistered(SchedulerDriver driver, Protos.MasterInfo 
masterInfo) {
+   mesosActor.tell(new ReRegistered(masterInfo), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void disconnected(SchedulerDriver driver) {
+   mesosActor.tell(new Disconnected(), ActorRef.noSender());
+   }
+
+
+   @Override
+   public void resourceOffers(SchedulerDriver driver, List 
offers) {
+   mesosActor.tell(new ResourceOffers(offers), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void offerRescinded(SchedulerDriver driver, Protos.OfferID 
offerId) {
+   mesosActor.tell(new OfferRescinded(offerId), 
ActorRef.noSender());
+   }
+
+   @Override
+   public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus 
status) {
+   mesosActor.tell(new StatusUpdate(status), ActorRef.noSender());
+   }
+
+   @Override
+   public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID 
executorId, Protos.SlaveID slaveId, byte[] data) {
+   throw new UnsupportedOperationException("frameworkMessage is 
unexpected");
--- End diff --

What other messages could the framework send? Is it worth crashing the 
actor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75285727
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler;
+
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+
+/**
+ * A builder for the Fenzo task scheduler.
+ *
+ * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be 
mocked, which motivates this interface.
+ */
+public interface TaskSchedulerBuilder {
+   TaskSchedulerBuilder withLeaseRejectAction(Action1 
action);
--- End diff --

new line would be nice :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75295468
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
+
+   public MesosArtifactServer(String sessionID, String serverHostname, int 
configuredPort) throws Exception {
+   if (configuredPort < 0 || configuredPort > 0x) {
+   throw new IllegalArgumentException("File server port is 
invalid: " + configuredPort);
+   }
+
+   router = new Router();
+
+   ChannelInitializer initializer = new 
ChannelInitializer() {
+
+   @Override
+   protected void initChannel(SocketChannel ch) {
+   

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75295536
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java 
---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.util;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Handler;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.handler.codec.http.router.Router;
+import io.netty.util.CharsetUtil;
+import org.jets3t.service.utils.Mimetypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpMethod.HEAD;
+import static io.netty.handler.codec.http.HttpResponseStatus.GONE;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static 
io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A generic Mesos artifact server, designed specifically for use by the 
Mesos Fetcher.
+ *
+ * More information:
+ * http://mesos.apache.org/documentation/latest/fetcher/
+ * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/
+ */
+public class MesosArtifactServer {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MesosArtifactServer.class);
+
+   private final Router router;
+
+   private ServerBootstrap bootstrap;
+
+   private Channel serverChannel;
+
+   private URL baseURL;
+
+   public MesosArtifactServer(String sessionID, String serverHostname, int 
configuredPort) throws Exception {
+   if (configuredPort < 0 || configuredPort > 0x) {
+   throw new IllegalArgumentException("File server port is 
invalid: " + configuredPort);
+   }
+
+   router = new Router();
+
+   ChannelInitializer initializer = new 
ChannelInitializer() {
+
+   @Override
+   protected void initChannel(SocketChannel ch) {
+   

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75296348
  
--- Diff: flink-mesos/src/main/resources/log4j.properties ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p 
%-60c %x - %m%n
+
+log4j.logger.org.apache.flink.mesos=DEBUG

+log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO
--- End diff --

Do we want to uncomment these two rules and keep the INFO default?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75296757
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 ---
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework
+
+import java.util.concurrent.{TimeUnit, ExecutorService}
+
+import akka.actor.ActorRef
+
+import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.{Configuration => 
FlinkConfiguration, ConfigConstants}
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
+import org.apache.flink.runtime.clusterframework.ApplicationStatus
+import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
+import org.apache.flink.runtime.clusterframework.messages._
+import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, 
JobManager}
+import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import 
org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus, JobNotFound}
+import org.apache.flink.runtime.messages.Messages.Acknowledge
+import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
+import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+
+/** JobManager actor for execution on Yarn or Mesos. It enriches the 
[[JobManager]] with additional messages
+  * to start/administer/stop the session.
--- End diff --

Good idea but this is yet to be integrated in the flink-yarn module.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75297945
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, FSM, Props}
+import grizzled.slf4j.Logger
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ConnectionMonitor._
+import org.apache.flink.mesos.scheduler.messages._
+
+import scala.concurrent.duration._
+
+/**
+  * Actively monitors the Mesos connection.
+  */
+class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] {
+
+  val LOG = Logger(getClass)
+
+  startWith(StoppedState, None)
+
+  when(StoppedState) {
+case Event(msg: Start, _) =>
+  LOG.info(s"Connecting to Mesos...")
+  goto(ConnectingState)
+  }
+
+  when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) {
+case Event(msg: Stop, _) =>
+  goto(StoppedState)
+
+case Event(msg: Registered, _) =>
+  LOG.info(s"Connected to Mesos as framework ID 
${msg.frameworkId.getValue}.")
+  LOG.debug(s"   Master Info: ${msg.masterInfo}")
+  goto(ConnectedState)
+
+case Event(msg: ReRegistered, _) =>
+  LOG.info("Reconnected to a new Mesos master.")
+  LOG.debug(s"   Master Info: ${msg.masterInfo}")
+  goto(ConnectedState)
+
+case Event(StateTimeout, _) =>
+  LOG.warn("Unable to connect to Mesos; still trying...")
+  stay()
+  }
+
+  when(ConnectedState) {
+case Event(msg: Stop, _) =>
+  goto(StoppedState)
+
+case Event(msg: Disconnected, _) =>
+  LOG.warn("Disconnected from the Mesos master.  Reconnecting...")
+  goto(ConnectingState)
+  }
+
--- End diff --

Would it make sense to add a `whenUnhandled {...}` handler here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75298678
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, FSM, Props}
+import com.netflix.fenzo._
+import com.netflix.fenzo.functions.Action1
+import com.netflix.fenzo.plugins.VMLeaseObject
+import grizzled.slf4j.Logger
+import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.LaunchCoordinator._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.Protos.TaskInfo
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{Map => MutableMap}
+import scala.concurrent.duration._
+
+/**
+  * The launch coordinator handles offer processing, including
+  * matching offers to tasks and making reservations.
+  *
+  * The coordinator uses Netflix Fenzo to optimize task placement.   
During the GatheringOffers phase,
--- End diff --

Fenzo also has my endorsement. It makes sense to delegate scheduling logic 
to a dedicated library.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75299022
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala
 ---
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, FSM, Props}
+import com.netflix.fenzo._
+import com.netflix.fenzo.functions.Action1
+import com.netflix.fenzo.plugins.VMLeaseObject
+import grizzled.slf4j.Logger
+import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.LaunchCoordinator._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.Protos.TaskInfo
--- End diff --

Unused import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75300491
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala 
---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import grizzled.slf4j.Logger
+
+import akka.actor.{Actor, FSM, Props}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor._
+import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, 
StatusUpdate}
+import org.apache.mesos.Protos.TaskState._
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.PartialFunction.empty
+import scala.concurrent.duration._
+
+/**
+  * Monitors a Mesos task throughout its lifecycle.
+  *
+  * Models a task with a state machine reflecting the perceived state of 
the task in Mesos.   The state
+  * is primarily updated when task status information arrives from Mesos.
+  *
+  * The associated state data primarily tracks the task's goal (intended) 
state, as persisted by the scheduler.
+  * Keep in mind that goal state is persisted before actions are taken.
The goal state strictly transitions
+  * thru New->Launched->Released.
+  *
+  * Unlike most exchanges with Mesos, task status is delivered 
at-least-once, so status handling should be idempotent.
+  */
+class TaskMonitor(
+flinkConfig: Configuration,
+schedulerDriver: SchedulerDriver,
+goalState: TaskGoalState) extends Actor with 
FSM[TaskMonitorState,StateData] {
+
+  val LOG = Logger(getClass)
+
+  startWith(Suspended, StateData(goalState))
+
+  // 

+  //  Suspended State
+  // 

+
+  when(Suspended) {
+case Event(update: TaskGoalStateUpdated, _) =>
+  stay() using StateData(update.state)
+case Event(msg: StatusUpdate, _) =>
+  stay()
+case Event(msg: Connected, StateData(goal: New)) =>
+  goto(New)
+case Event(msg: Connected, StateData(goal: Launched)) =>
+  goto(Reconciling)
+case Event(msg: Connected, StateData(goal: Released)) =>
+  goto(Killing)
+  }
+
+  // 

+  //  New State
+  // 

+
+  when(New) {
+case Event(TaskGoalStateUpdated(goal: Launched), _) =>
+  goto(Staging) using StateData(goal)
+  }
+
+  // 

+  //  Reconciliation State
+  // 

+
+  onTransition {
+case _ -> Reconciling =>
+  nextStateData.goal match {
+case goal: Launched =>
+  val taskStatus = Protos.TaskStatus.newBuilder()
+
.setTaskId(goal.taskID).setSlaveId(goal.slaveID).setState(TASK_STAGING).build()
+  context.parent ! Reconcile(Seq(taskStatus))
--- End diff --

Would it be cleaner to pass the `ActorRef` directly to the TaskMonitor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75300879
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, Props}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, 
TaskGoalStateUpdated, TaskTerminated}
+import org.apache.flink.mesos.scheduler.Tasks._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.mutable.{Map => MutableMap}
+
+/**
+  * Aggregate of monitored tasks.
+  *
+  * Routes messages between the scheduler and individual task monitor 
actors.
+  */
+class Tasks[M <: TaskMonitor](
+ flinkConfig: Configuration,
+ schedulerDriver: SchedulerDriver,
+ taskMonitorClass: Class[M]) extends Actor {
+
+  /**
+* A map of task monitors by task ID.
+*/
+  private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap()
--- End diff --

space after Protos.TaskID,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75301383
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, ActorRef, Props}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile
+import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, 
TaskGoalStateUpdated, TaskTerminated}
+import org.apache.flink.mesos.scheduler.Tasks._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.mesos.{SchedulerDriver, Protos}
+
+import scala.collection.mutable.{Map => MutableMap}
+
+/**
+  * Aggregate of monitored tasks.
+  *
+  * Routes messages between the scheduler and individual task monitor 
actors.
+  */
+class Tasks[M <: TaskMonitor](
+ flinkConfig: Configuration,
+ schedulerDriver: SchedulerDriver,
+ taskMonitorClass: Class[M]) extends Actor {
+
+  /**
+* A map of task monitors by task ID.
+*/
+  private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap()
+
+  /**
+* Cache of current connection state.
+*/
+  private var registered: Option[Any] = None
+
+  override def preStart(): Unit = {
+// TODO subscribe to context.system.deadLetters for messages to 
nonexistent tasks
+  }
+
+  override def receive: Receive = {
+
+case msg: Disconnected =>
+  registered = None
+  context.actorSelection("*").tell(msg, self)
+
+case msg : Connected =>
+  registered = Some(msg)
+  context.actorSelection("*").tell(msg, self)
+
+case msg: TaskGoalStateUpdated =>
+  val taskID = msg.state.taskID
+
+  // ensure task monitor exists
+  if(!taskMap.contains(taskID)) {
+val actorRef = createTask(msg.state)
+registered.foreach(actorRef ! _)
+  }
+
+  taskMap(taskID) ! msg
+
+case msg: StatusUpdate =>
+  taskMap(msg.status().getTaskId) ! msg
+
+case msg: Reconcile =>
+  context.parent.forward(msg)
--- End diff --

Same as above. The parent is the resource manager. Do we want to make this 
explicit? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75303392
  
--- Diff: 
flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala
 ---
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{Collections, UUID}
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.actor.FSM.StateTimeout
+import akka.testkit._
+import com.netflix.fenzo.TaskRequest.{AssignedResources, 
NamedResourceSetRequest}
+import com.netflix.fenzo._
+import com.netflix.fenzo.functions.{Action1, Action2}
+import com.netflix.fenzo.plugins.VMLeaseObject
+import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.LaunchCoordinator._
+import org.apache.flink.mesos.scheduler.messages._
+import org.apache.flink.runtime.akka.AkkaUtils
+import org.apache.mesos.Protos.{SlaveID, TaskInfo}
+import org.apache.mesos.{SchedulerDriver, Protos}
+import org.junit.runner.RunWith
+import org.mockito.Mockito.{verify, _}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{Matchers => MM, Mockito}
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
+
+import scala.collection.JavaConverters._
+
+import org.apache.flink.mesos.Utils.range
+import org.apache.flink.mesos.Utils.ranges
+import org.apache.flink.mesos.Utils.scalar
+
+@RunWith(classOf[JUnitRunner])
+class LaunchCoordinatorTest
+  extends TestKitBase
+with ImplicitSender
+with WordSpecLike
+with Matchers
+with BeforeAndAfterAll {
+
+  lazy val config = new Configuration()
+  implicit lazy val system = AkkaUtils.createLocalActorSystem(config)
+
+  override def afterAll(): Unit = {
+TestKit.shutdownActorSystem(system)
+  }
+
+  def randomFramework = {
+
Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID.toString).build
+  }
+
+  def randomTask = {
+val taskID = 
Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build
+
+def generateTaskRequest = {
+  new TaskRequest() {
+private[mesos] val assignedResources = new 
AtomicReference[TaskRequest.AssignedResources]
+override def getId: String = taskID.getValue
+override def taskGroupName: String = ""
+override def getCPUs: Double = 1.0
+override def getMemory: Double = 1024.0
+override def getNetworkMbps: Double = 0.0
+override def getDisk: Double = 0.0
+override def getPorts: Int = 1
+override def getCustomNamedResources: java.util.Map[String, 
NamedResourceSetRequest] =
+  Collections.emptyMap[String, NamedResourceSetRequest]
+override def getSoftConstraints: java.util.List[_ <: 
VMTaskFitnessCalculator] = null
+override def getHardConstraints: java.util.List[_ <: 
ConstraintEvaluator] = null
+override def getAssignedResources: AssignedResources = 
assignedResources.get()
+override def setAssignedResources(assignedResources: 
AssignedResources): Unit = {
+  this.assignedResources.set(assignedResources)
+}
+  }
+}
+
+val task: LaunchableTask = new LaunchableTask() {
+  override def taskRequest: TaskRequest = generateTaskRequest
+  override def launch(slaveId: SlaveID, taskAssignment: 
TaskAssignmentResult): Protos.TaskInfo = {
+Protos.TaskInfo.newBuilder
+  .setTaskId(taskID).setName(taskID.getValue)
+  .setCommand(Protos.CommandInfo.newBuilder.setValue("whoami"))
+  .setSlaveId(slaveId)
+  .build()
+  }

[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)

2016-08-18 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2315
  
Thank you for your work @EronWright.  Finally could go through the code. 
All in all, very impressive as the first step of the Mesos integration! I think 
this PR is in a mergeable state if some minor comments are addressed.

I'm not 100 % sure about all the additional actors yet. It seems like 
`ReconciliationCoordinator`, `ConnectionMonitor`, `TaskMonitor` could also 
easily be handled inside `MesosFlinkResourceManager`. In terms of modularity, I 
can see that having these run independently can give us a more flexible setup. 
Which of the actors do you plan to re-use in the next set of changes? Clearly, 
in terms of testability it comes in really handy. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-18 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75307414
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala 
---
@@ -58,10 +62,63 @@ class JobInfo(
 }
   }
 
-  override def toString = s"JobInfo(client: $client ($listeningBehaviour), 
start: $start)"
+
+  /**
+* Notifies all clients by sending a message
+* @param message the message to send
+*/
+  def notifyClients(message: Any) = {
+clients foreach {
+  case (clientActor, _) =>
+clientActor ! message
+}
+  }
+
+  /**
+* Notifies all clients which are not of type detached
+* @param message the message to sent to non-detached clients
+*/
+  def notifyNonDetachedClients(message: Any) = {
+clients foreach {
+  case (clientActor, ListeningBehaviour.DETACHED) =>
+// do nothing
+  case (clientActor, _) =>
+clientActor ! message
+}
+  }
+
+  /**
+* Sends a message to job clients that match the listening behavior
+* @param message the message to send to all clients
+* @param listeningBehaviour the desired listening behaviour
+*/
+  def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) 
= {
+clients foreach {
+  case (clientActor, `listeningBehaviour`) =>
+clientActor ! message
+  case _ =>
+}
+  }
 
   def setLastActive() =
 lastActive = System.currentTimeMillis()
+
+
+  override def toString = s"JobInfo(clients: ${clients.toString()}, start: 
$start)"
+
+  override def equals(other: Any): Boolean = other match {
+case that: JobInfo =>
+  this.isInstanceOf[JobInfo] &&
--- End diff --

Yes, that's unnecessary because of the case type check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75442774
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+   jobManager.ask(
+   new 
JobManagerMessages.Requ

[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75442857
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java
 ---
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.clients.examples;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.collection.Seq;
+
+import java.util.concurrent.Semaphore;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Tests retrieval of a job from a running Flink cluster
+ */
+public class JobRetrievalITCase {
--- End diff --

Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75442830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java 
---
@@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) {
decorateMessage(new Status.Failure(new 
Exception(msg))), ActorRef.noSender());
}
}
+   else if (message instanceof AttachToJobAndWait) {
--- End diff --

I agree that this design could be improved. I'll consider factoring out 
common code into a base class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75443084
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+   jobManager.ask(
+   new 
JobManagerMessages.Requ

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

2016-08-19 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2313
  
Thanks for the review @tillrohrmann. Yes, the plan is to have a `JobClient` 
API class (the existing JobClient class will be renamed) which uses the 
SubmissionContext to supervise submitted jobs or attach to existing jobs. All 
the job-related methods from `ClusterClient` will be moved to this new class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75459375
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
--- End diff --

True, this code assumes that the JobManager doesn't change between 
retrieving the leading jobmanager and retrieving the class loader. There is 
always some possible gap where the jobmanager could change. We could mitigate 
this by retrying in case is has changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

2016-08-19 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2313
  
@tillrohrmann I've refactored the JobClientActor to include the common code 
in `JobClientActorBase` and have implementations for submitting/attaching in 
`JobSubmissionClientActor` and `JobAttachmentClientActor`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75468671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
+   LOG.info("Reconstructed class loader for Job {}" , 
jobID);
+   } catch (Exception e) {
+   LOG.warn("Couldn't retrieve classloader for {}. Using 
system class loader", jobID, e);
+   classloader = JobClient.class.getClassLoader();
+   }
+
+   // we create a proxy JobClientActor that deals with all 
communication with
+   // the JobManager. It forwards the job submission, checks the 
success/failure responses, logs
+   // update messages, watches for disconnect between client and 
JobManager, ...
+   Props jobClientActorProps = 
JobClientActor.createJobClientActorProps(
+   leaderRetrievalService,
+   timeout,
+   sysoutLogUpdates);
+
+   ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
+
+   Future attachmentFuture = Patterns.ask(
+   jobClientActor,
+   new JobClientMessages.AttachToJobAndWait(jobID),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobID,
+   attachmentFuture,
+   jobClientActor,
+   classloader);
+   }
+
+   /**
+* Reconstructs the class loader by first requesting information about 
it at the JobManager
+* and then downloading missing jar files.
+* @param jobID id of job
+* @param jobManager gateway to the JobManager
+* @param config the flink configuration
+* @param timeout timeout for querying the jobmanager
+* @return A classloader that should behave like the original 
classloader
+* @throws JobRetrievalException if anything goes wrong
+*/
+   public static ClassLoader retrieveClassLoader(
+   JobID jobID,
+   ActorGateway jobManager,
+   Configuration config,
+   FiniteDuration timeout)
+   throws JobRetrievalException {
+
+   final Object jmAnswer;
+   try {
+   jmAnswer = Await.result(
+   jobManager.ask(
+   new 
JobManagerMessages.Requ

[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

2016-08-19 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2313
  
@tillrohrmann Pinging the actor now to check if it is still alive. Also 
added another test case for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75474811
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
--- End diff --

Actually, I'm not really sure about this corner case. We don't typically 
retry client side operations in case the leader has changed after retrieving 
it. Instead, we just throw an error (see all the methods in `ClusterClient`). 
The `JobClientActor` is exceptional in this regard and it has to be because it 
operates independently of the user function.

So we could fail if we can't reconstruct the class loader. That of course 
has the caveat that even if the user doesn't use custom classes for the 
JobExecutionResult or Exceptions, the job retrieval may fail (e.g. firewall 
blocking the blobManager port). That's why I didn't want to enforce this step 
but we could enforce it and fix eventual problems with the BlobManager 
communication if there are any.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-19 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75482760
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait(
sysoutLogUpdates);
 
ActorRef jobClientActor = 
actorSystem.actorOf(jobClientActorProps);
-   
+
+   Future submissionFuture = Patterns.ask(
+   jobClientActor,
+   new 
JobClientMessages.SubmitJobAndWait(jobGraph),
+   new Timeout(AkkaUtils.INF_TIMEOUT()));
+
+   return new JobListeningContext(
+   jobGraph.getJobID(),
+   submissionFuture,
+   jobClientActor,
+   classLoader);
+   }
+
+
+   /**
+* Attaches to a running Job using the JobID.
+* Reconstructs the user class loader by downloading the jars from the 
JobManager.
+* @throws JobRetrievalException if anything goes wrong while 
retrieving the job
+*/
+   public static JobListeningContext attachToRunningJob(
+   JobID jobID,
+   ActorGateway jobManagerGateWay,
+   Configuration configuration,
+   ActorSystem actorSystem,
+   LeaderRetrievalService leaderRetrievalService,
+   FiniteDuration timeout,
+   boolean sysoutLogUpdates) throws JobRetrievalException {
+
+   checkNotNull(jobID, "The jobID must not be null.");
+   checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not 
be null.");
+   checkNotNull(configuration, "The configuration must not be 
null.");
+   checkNotNull(actorSystem, "The actorSystem must not be null.");
+   checkNotNull(leaderRetrievalService, "The jobManagerGateway 
must not be null.");
+   checkNotNull(timeout, "The timeout must not be null.");
+
+   // retrieve classloader first before doing anything
+   ClassLoader classloader;
+   try {
+   classloader = retrieveClassLoader(jobID, 
jobManagerGateWay, configuration, timeout);
--- End diff --

In addition to the result, you'll also need the class loader for getting 
accumulators of a running job. 

I agree that it would be nice to fail when the class loader can't be 
reconstructed, but *only* if it is really the only option. So we could start 
off with the class loader set to `None` in the `JobListeningContext`. When the 
class loader is needed, i.e. accumulator retrieval or job execution result 
retrieval, it is fetched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

2016-08-19 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2313
  
I've made the last changes concerning the lazy reconstruction of the class 
loader we discussed. Rebased to master. Should be good to go now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75641421
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75641515
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+/**
+ * The Mesos environment variables used for settings of the containers.
+ */
+public class MesosConfigKeys {
--- End diff --

Yes, a follow-up is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75641703
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java 
---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.cli;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FlinkMesosSessionCli {
--- End diff --

Yes, I recognized it :) Sure! No problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75642452
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import com.netflix.fenzo.ConstraintEvaluator;
+import com.netflix.fenzo.TaskAssignmentResult;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.VMTaskFitnessCalculator;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.mesos.Protos;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.mesos.Utils.variable;
+import static org.apache.flink.mesos.Utils.range;
+import static org.apache.flink.mesos.Utils.ranges;
+import static org.apache.flink.mesos.Utils.scalar;
+
+/**
+ * Specifies how to launch a Mesos worker.
+ */
+public class LaunchableMesosWorker implements LaunchableTask {
+
+   /**
+* The set of configuration keys to be dynamically configured with a 
port allocated from Mesos.
+*/
+   private static String[] TM_PORT_KEYS = {
+   "taskmanager.rpc.port",
+   "taskmanager.data.port" };
+
+   private final MesosTaskManagerParameters params;
+   private final Protos.TaskInfo.Builder template;
+   private final Protos.TaskID taskID;
+   private final Request taskRequest;
+
+   /**
+* Construct a launchable Mesos worker.
+* @param params the TM parameters such as memory, cpu to acquire.
+* @param template a template for the TaskInfo to be constructed at 
launch time.
+* @param taskID the taskID for this worker.
+ */
+   public LaunchableMesosWorker(MesosTaskManagerParameters params, 
Protos.TaskInfo.Builder template, Protos.TaskID taskID) {
+   this.params = params;
+   this.template = template;
+   this.taskID = taskID;
+   this.taskRequest = new Request();
+   }
+
+   public Protos.TaskID taskID() {
+   return taskID;
+   }
+
+   @Override
+   public TaskRequest taskRequest() {
+   return taskRequest;
+   }
+
+   class Request implements TaskRequest {
+   private final AtomicReference 
assignedResources = new AtomicReference<>();
+
+   @Override
+   public String getId() {
+   return taskID.getValue();
+   }
+
+   @Override
+   public String taskGroupName() {
+   return "";
+   }
+
+   @Override
+   public double getCPUs() {
+   return params.cpus();
+   }
+
+   @Override
+   public double getMemory() {
+   return 
params.containeredParameters().taskManagerTotalMemoryMB();
+   }
+
+   @Override
+   public double getNetworkMbps() {
+   return 0.0;
+   }
+
+   @Override
+   public double getDisk() {
+   return 0.0;
--- End diff --

So 0.0 means give me whatever? How about a minimum value, e.g. 500MB?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r75642882
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import akka.actor.{Actor, FSM, Props}
+import grizzled.slf4j.Logger
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.mesos.scheduler.ConnectionMonitor._
+import org.apache.flink.mesos.scheduler.messages._
+
+import scala.concurrent.duration._
+
+/**
+  * Actively monitors the Mesos connection.
+  */
+class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] {
+
+  val LOG = Logger(getClass)
+
+  startWith(StoppedState, None)
+
+  when(StoppedState) {
+case Event(msg: Start, _) =>
+  LOG.info(s"Connecting to Mesos...")
+  goto(ConnectingState)
+  }
+
+  when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) {
+case Event(msg: Stop, _) =>
+  goto(StoppedState)
+
+case Event(msg: Registered, _) =>
+  LOG.info(s"Connected to Mesos as framework ID 
${msg.frameworkId.getValue}.")
+  LOG.debug(s"   Master Info: ${msg.masterInfo}")
+  goto(ConnectedState)
+
+case Event(msg: ReRegistered, _) =>
+  LOG.info("Reconnected to a new Mesos master.")
+  LOG.debug(s"   Master Info: ${msg.masterInfo}")
+  goto(ConnectedState)
+
+case Event(StateTimeout, _) =>
+  LOG.warn("Unable to connect to Mesos; still trying...")
+  stay()
+  }
+
+  when(ConnectedState) {
+case Event(msg: Stop, _) =>
+  goto(StoppedState)
+
+case Event(msg: Disconnected, _) =>
+  LOG.warn("Disconnected from the Mesos master.  Reconnecting...")
+  goto(ConnectingState)
+  }
+
--- End diff --

The default is to just log a warning and drop the message?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75644911
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75644987
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75645192
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75645973
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
--- End diff --

Do we want to rely on Scala tuples here after the conversion from Scala to 
Java? I would suggest `InetAddress` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your projec

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75646080
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
--- End diff --

Do we want to rely on Scala tu

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75646889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75646940
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
--- End diff --

Missing `ResourceID` param here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647172
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647510
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647749
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647841
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647928
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647937
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647940
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75648030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75647950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75648199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75648406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75648837
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
--- End diff --

This is still named "taskmanager"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75648889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
--- End diff --

Also elsewhere in the code it still says TaskManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75649283
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-22 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2400
  
Thank you for the pull request @wangzhijiang999! Looks good. I just had 
some minor remarks on the Scala-->Java conversion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75650141
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
 
final JobID jobID = listeningContext.jobID;
+   final ActorRef jobClientActor = listeningContext.jobClientActor;
final Future jobSubmissionFuture = 
listeningContext.jobResultFuture;
final ClassLoader classLoader = listeningContext.classLoader;
 
+   while (!jobSubmissionFuture.isCompleted()) {
+   try {
+   Thread.sleep(250);
+   } catch (InterruptedException e) {
+   throw new JobExecutionException(jobID, 
"Interrupted while waiting for execution result.", e);
+   }
+
+   try {
+   Await.result(
+   Patterns.ask(
+   jobClientActor,
+   JobClientMessages.getPing(),
--- End diff --

Sure, we can do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75650460
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
 
final JobID jobID = listeningContext.jobID;
+   final ActorRef jobClientActor = listeningContext.jobClientActor;
final Future jobSubmissionFuture = 
listeningContext.jobResultFuture;
final ClassLoader classLoader = listeningContext.classLoader;
 
+   while (!jobSubmissionFuture.isCompleted()) {
+   try {
+   Thread.sleep(250);
--- End diff --

Sure, we can do that. We will have longer intervals between the checks then 
but that is probably fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75650523
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
 
final JobID jobID = listeningContext.jobID;
+   final ActorRef jobClientActor = listeningContext.jobClientActor;
final Future jobSubmissionFuture = 
listeningContext.jobResultFuture;
final ClassLoader classLoader = listeningContext.classLoader;
 
+   while (!jobSubmissionFuture.isCompleted()) {
+   try {
+   Thread.sleep(250);
+   } catch (InterruptedException e) {
+   throw new JobExecutionException(jobID, 
"Interrupted while waiting for execution result.", e);
+   }
+
+   try {
+   Await.result(
+   Patterns.ask(
+   jobClientActor,
+   JobClientMessages.getPing(),
--- End diff --

Good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75669289
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
 
final JobID jobID = listeningContext.jobID;
+   final ActorRef jobClientActor = listeningContext.jobClientActor;
final Future jobSubmissionFuture = 
listeningContext.jobResultFuture;
final ClassLoader classLoader = listeningContext.classLoader;
 
+   while (!jobSubmissionFuture.isCompleted()) {
+   try {
+   Thread.sleep(250);
--- End diff --

Actually, it's not quite as easy. If we simply wait on the ask timeout via 
`Await.ready/result`, then we complete the `jobSubmissionFuture` with a timeout 
(if the job runs longer then the timeout interval). Subsequent checks will 
always return the same result.

Thus, we probably need something like a sleep here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75669417
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
 
final JobID jobID = listeningContext.jobID;
+   final ActorRef jobClientActor = listeningContext.jobClientActor;
final Future jobSubmissionFuture = 
listeningContext.jobResultFuture;
final ClassLoader classLoader = listeningContext.classLoader;
 
+   while (!jobSubmissionFuture.isCompleted()) {
+   try {
+   Thread.sleep(250);
--- End diff --

Or something like an immutable future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2313#discussion_r75676249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---
@@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader(
public static JobExecutionResult awaitJobResult(JobListeningContext 
listeningContext) throws JobExecutionException {
 
final JobID jobID = listeningContext.jobID;
+   final ActorRef jobClientActor = listeningContext.jobClientActor;
final Future jobSubmissionFuture = 
listeningContext.jobResultFuture;
final ClassLoader classLoader = listeningContext.classLoader;
 
+   while (!jobSubmissionFuture.isCompleted()) {
+   try {
+   Thread.sleep(250);
--- End diff --

My bad, the future was only completed due to some faulty testing code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs

2016-08-22 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2313
  
Updated according to our comment discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-22 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2400
  
Hi @wangzhijiang999! `ActorSystem` and `LeaderRetrievalService` should only 
every appear in the factory (startTaskManagerComponentsAndActor) which bring up 
the TaskExecutor component. The TaskExecutor itself uses the new classes 
`RpcService` and `HighAvailabilityService`. All other code should change much, 
except for the Scala->Java changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75680482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75680765
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-22 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r75680917
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -35,27 +79,634 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** Return code for critical errors during the runtime */
+   private static final int RUNTIME_FAILURE_RETURN_CODE = 2;
+
+   /** The name of the TaskManager actor */
+   private static final String TASK_MANAGER_NAME = "taskmanager";
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskManagerConfiguration taskManagerConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskManagerConfiguration taskManagerConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskManagerConfig = checkNotNull(taskManagerConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param taskManagerClass The actor class to instantiate.
+* Allows to use TaskManager subclasses for 
example for YARN.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID,
+   Class taskManagerClass) throws Exception 
{
+
+   Tuple2 tuple2 = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(tuple2._1(), resourceID, tuple2._2(), 
configuration, taskManagerClass);
+   }
+
+   private static Tuple2 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+  

[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-08-22 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
Hi @vijikarthi! Sorry for the late reply. I intend to merge this as soon as 
possible. When I run the secure test cases from IntelliJ I get 
>java.lang.NoClassDefFoundError: jdbm/helper/CachePolicy

Investigating why this could be the case. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-08-22 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
YARNSessionFIFOSecuredITCase gives me the following:

>17:49:58,097 INFO  SecurityLogger.org.apache.hadoop.ipc.Server 
  - Auth successful for appattempt_1471880990715_0001_01 (auth:SIMPLE)

It is not using Kerberos it seems. We should check that security is really 
enabled and fail the test if not.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-08-22 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2275
  
`RollingFileSink`

```
---
 T E S T S
---
Running org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase
Formatting using clusterid: testClusterID
java.net.BindException: Permission denied
at java.net.PlainSocketImpl.socketBind(Native Method)
at 
java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:382)
at java.net.ServerSocket.bind(ServerSocket.java:375)
at 
org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.getSecureResources(SecureDataNodeStarter.java:103)
at 
org.apache.hadoop.hdfs.MiniDFSCluster.startDataNodes(MiniDFSCluster.java:1213)
at 
org.apache.hadoop.hdfs.MiniDFSCluster.initMiniDFSCluster(MiniDFSCluster.java:684)
at org.apache.hadoop.hdfs.MiniDFSCluster.(MiniDFSCluster.java:351)
at 
org.apache.hadoop.hdfs.MiniDFSCluster$Builder.build(MiniDFSCluster.java:332)
at 
org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase.startSecureCluster(RollingSinkSecuredITCase.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 6.144 sec 
<<< FAILURE! - in 
org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase
org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase  Time 
elapsed: 6.143 sec  <<< ERROR!
java.lang.RuntimeException: Cannot start secure cluster without privileged 
resources.
at 
org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:734)
at 
org.apache.hadoop.hdfs.server.datanode.DataNode.(DataNode.java:316)
at 
org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:1848)
at 
org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:1748)
at 
org.apache.hadoop.hdfs.MiniDFSCluster.startDataNodes(MiniDFSCluster.java:1218)
at 
org.apache.hadoop.hdfs.MiniDFSCluster.initMiniDFSCluster(MiniDFSCluster.java:684)
at org.apache.hadoop.hdfs.MiniDFSCluster.(MiniDFSCluster.java:351)
at 
org.apache.hadoop.hdfs.MiniDFSCluster$Builder.build(MiniDFSCluster.java:332)
at 
org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase.startSecureCluster(RollingSinkSecuredITCase.java:117)

org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase  Time 
elapsed: 6.144 sec  <<< ERROR!
java.lang.NullPointerException: null
at 
org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase.teardownSecureCluster(RollingSinkSecuredITCase.java:135)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   6   7   8   9   10   >