[GitHub] flink issue #2347: [FLINK-4236] fix error handling for jar files with no mai...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2347 Merging this after a Checkstyle fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75122721 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** +* Reconstructs the class loader by first requesting information about it at the JobManager +* and then downloading missing jar files. +* @param jobID id of job +* @param jobManager gateway to the JobManager +* @param config the flink configuration +* @param timeout timeout for querying the jobmanager +* @return A classloader that should behave like the original classloader +* @throws JobRetrievalException if anything goes wrong +*/ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + BlobCache blobClient = null; + try { + final Object jmAnswer; + try { + jmAnswer = Await.result( +
[GitHub] flink pull request #2340: [FLINK-3155] Update docker flink container to the ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2340#discussion_r75123833 --- Diff: flink-contrib/docker-flink/Dockerfile --- @@ -22,25 +22,31 @@ FROM java:8-jre-alpine RUN apk add --no-cache bash snappy # Configure Flink version -ARG FLINK_VERSION=1.0.3 +ARG FLINK_VERSION=1.1.1 ARG HADOOP_VERSION=27 ARG SCALA_VERSION=2.11 +# Flink environment variables +ARG FLINK_INSTALL_PATH=/opt +ENV FLINK_HOME $FLINK_INSTALL_PATH/flink +ENV PATH $PATH:$FLINK_HOME/bin + # Install build dependencies and flink RUN set -x && \ + mkdir -p $FLINK_INSTALL_PATH && \ apk --update add --virtual build-dependencies curl && \ - curl -s $(curl -s https://www.apache.org/dyn/closer.cgi\?as_json\=1 | \ - awk '/preferred/ {gsub(/"/,""); print $2}')flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz | \ - tar xvz -C /usr/local/ && \ - ln -s /usr/local/flink-$FLINK_VERSION /usr/local/flink && \ - sed -i -e "s/echo \$mypid >> \$pid/echo \$mypid >> \$pid \&\& wait/g" /usr/local/flink/bin/flink-daemon.sh && \ + curl -s $(curl -s https://www.apache.org/dyn/closer.cgi\?preferred\=true)flink/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-hadoop${HADOOP_VERSION}-scala_${SCALA_VERSION}.tgz | \ --- End diff -- Much better! Thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2340: [FLINK-3155] Update docker flink container to the latest ...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2340 Thanks for addressing my comments! Looks very good. I'm sorry I asked whether you have tested the changes. It may seem natural to you but trust me that is not always the case :) Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75123576 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,168 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); --- End diff -- That's not possible because the `JobClientActor` will complete this future with the result of the job execution which may be be infinitely delayed. In all other cases (i.e. timeout to register at jobmanager, failure to attach to job, failure to submit job), the `JobClientActor` will complete the future with a failure message. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2317: [FLINK-4287] Ensure the yarn-session.sh classpath contain...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2317 Woops. Didn't end up merging #2320 so this got overlooked. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2317: [FLINK-4287] Ensure the yarn-session.sh classpath ...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2317#discussion_r75126672 --- Diff: flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh --- @@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4 export FLINK_CONF_DIR -$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j $FLINK_LIB_DIR/flink-dist*.jar "$@" --- End diff -- The name was changed recently (some months ago). Niels was just on an old base. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2317: [FLINK-4287] Ensure the yarn-session.sh classpath contain...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2317 Rebased and ready to go. Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2114: FLINK-3839 Added the extraction of jar files from a URL s...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2114 Hi @thormanrd! Thanks for your PR. As Robert and Stephan already pointed out, it contains a lot of reformats which are generally encouraged. They make the pull request's essentials hard to review and can even introduce subtle bugs. The prefered way of syncing up with the master is to rebased against the master before you open a pull request: ``` git pull --rebase upstream master ``` `upstream` is the main Flink repository. Maybe it's called `origin` in your case. Check using `git remote -v`. After the pull request has been opened you should only push additional commits (no rebasing) to keep the comments and discussion. In the end, either you or the committer rebases again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75269835 --- Diff: flink-dist/pom.xml --- @@ -113,8 +113,13 @@ under the License. flink-metrics-jmx ${project.version} + + + org.apache.flink + flink-mesos_2.10 + ${project.version} + --- End diff -- We always build yarn. We use the `include-yarn-tests` profile to include/exclude yarn tests. The `include-yarn` profile, on the other hand, it to exclude yarn for the Haodop 1 version of Flink. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75269975 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT + .. + + + flink-mesos_2.10 + flink-mesos + jar + + +0.27.1 + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + + + hadoop-core --- End diff -- Why do you exclude just `hadoop-core` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75270753 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.cli; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class FlinkMesosSessionCli { --- End diff -- This looks just like a dummy/stub class? Not a CLI yet :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75271531 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- This is always 0.0 which means give me whatever is free? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75271636 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos; + +import org.apache.mesos.Protos; + +import java.net.URL; +import java.util.Arrays; + +public class Utils { + /** +* Construct a Mesos environment variable. + */ --- End diff -- indention off here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75272075 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; + } + + @Override + public int getPorts() { + return TM_PORT_KEYS.length; + } + + @Override + public Map getCustomNamedResources() { + return Collections.emptyMap(); + } + + @Override + public List getHardConstraints() { + return null; + } + + @
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75272248 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- I would rather throw an exception here if the value is not in use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75274022 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75275043 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT --- End diff -- This needs to be bumped to `1.2-SNAPSHOT`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75275735 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +/** + * The Mesos environment variables used for settings of the containers. + */ +public class MesosConfigKeys { --- End diff -- I wonder, would it make sense to create `ContainerEnvConfigKeys` with the shared environment variables in `YarnConfigKeys` and `MesosConfigKeys`? The overlap is quite quite significant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75276016 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75276925 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75279529 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75280315 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75281657 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework + +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore +import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} + +/** + * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}. + */ +case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable { --- End diff -- Why is this class written in Scala? It seems like this class is only used from Java code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75282003 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); --- End diff -- Per definition, variables are static and final in interfaces :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75282731 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import com.google.common.collect.ImmutableList; +import org.apache.mesos.Protos; +import scala.Option; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * A standalone Mesos worker store. + */ +public class StandaloneMesosWorkerStore implements MesosWorkerStore { + + private Option frameworkID = Option.empty(); + + private int taskCount = 0; + + private Map storedWorkers = new LinkedHashMap<>(); + + public StandaloneMesosWorkerStore() { + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public Option getFrameworkID() throws Exception { + return frameworkID; + } + + @Override + public void setFrameworkID(Option frameworkID) throws Exception { + this.frameworkID = frameworkID; + } + + @Override + public List recoverWorkers() throws Exception { + return ImmutableList.copyOf(storedWorkers.values()); + } + + @Override + public Protos.TaskID newTaskID() throws Exception { + Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(++taskCount)).build(); + return taskID; --- End diff -- Could be simplified: `return Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(++taskCount)).build();`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75283539 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); --- End diff -- Seems like the store should only be accessed by the ResourceManager. In this case we could remove the lock. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75283689 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } + + // valid transition methods --- End diff -- Could you frame the transition methods with comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75283918 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } + + // valid transition methods + + public static Worker newTask(Protos.TaskID taskID) { + return new Worker( + taskID, + Option.empty(), Option.empty(), + TaskState.New); + } + + public Worker launchTask(Protos.SlaveID slaveID, String hostname) { + return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), TaskState.Launched); + } + + public Worker releaseTask() { + return new Worker(taskID, slaveID, hostname, TaskState.Released); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Worker worker = (Worker) o; + return Objects.equals(taskID, worker.taskI
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75284993 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; --- End diff -- The `MesosActor` is actually the `MesosFlinkResourceManager`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75285550 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; + + public SchedulerProxy(ActorRef mesosActor) { + this.mesosActor = mesosActor; + } + + @Override + public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) { + mesosActor.tell(new Registered(frameworkId, masterInfo), ActorRef.noSender()); + } + + @Override + public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { + mesosActor.tell(new ReRegistered(masterInfo), ActorRef.noSender()); + } + + @Override + public void disconnected(SchedulerDriver driver) { + mesosActor.tell(new Disconnected(), ActorRef.noSender()); + } + + + @Override + public void resourceOffers(SchedulerDriver driver, List offers) { + mesosActor.tell(new ResourceOffers(offers), ActorRef.noSender()); + } + + @Override + public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) { + mesosActor.tell(new OfferRescinded(offerId), ActorRef.noSender()); + } + + @Override + public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) { + mesosActor.tell(new StatusUpdate(status), ActorRef.noSender()); + } + + @Override + public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) { + throw new UnsupportedOperationException("frameworkMessage is unexpected"); + } + + @Override + public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) { + mesosActor.tell(new SlaveLost(slaveId), ActorRef.noSender()); + } + + @Override + public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) { + throw new UnsupportedOperationException("executorLost is unexpected"); --- End diff -- Why don't we forward this message and crash the actor instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75285623 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; + + public SchedulerProxy(ActorRef mesosActor) { + this.mesosActor = mesosActor; + } + + @Override + public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) { + mesosActor.tell(new Registered(frameworkId, masterInfo), ActorRef.noSender()); + } + + @Override + public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { + mesosActor.tell(new ReRegistered(masterInfo), ActorRef.noSender()); + } + + @Override + public void disconnected(SchedulerDriver driver) { + mesosActor.tell(new Disconnected(), ActorRef.noSender()); + } + + + @Override + public void resourceOffers(SchedulerDriver driver, List offers) { + mesosActor.tell(new ResourceOffers(offers), ActorRef.noSender()); + } + + @Override + public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) { + mesosActor.tell(new OfferRescinded(offerId), ActorRef.noSender()); + } + + @Override + public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) { + mesosActor.tell(new StatusUpdate(status), ActorRef.noSender()); + } + + @Override + public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) { + throw new UnsupportedOperationException("frameworkMessage is unexpected"); --- End diff -- What other messages could the framework send? Is it worth crashing the actor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75285727 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; + +/** + * A builder for the Fenzo task scheduler. + * + * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface. + */ +public interface TaskSchedulerBuilder { + TaskSchedulerBuilder withLeaseRejectAction(Action1 action); --- End diff -- new line would be nice :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75295468 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer initializer = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { +
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75295536 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer initializer = new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) { +
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75296348 --- Diff: flink-mesos/src/main/resources/log4j.properties --- @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +# Convenience file for local debugging of the JobManager/TaskManager. +log4j.rootLogger=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +log4j.logger.org.apache.flink.mesos=DEBUG +log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO --- End diff -- Do we want to uncomment these two rules and keep the INFO default? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75296757 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. --- End diff -- Good idea but this is yet to be integrated in the flink-yarn module. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75297945 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { + + val LOG = Logger(getClass) + + startWith(StoppedState, None) + + when(StoppedState) { +case Event(msg: Start, _) => + LOG.info(s"Connecting to Mesos...") + goto(ConnectingState) + } + + when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Registered, _) => + LOG.info(s"Connected to Mesos as framework ID ${msg.frameworkId.getValue}.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(msg: ReRegistered, _) => + LOG.info("Reconnected to a new Mesos master.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(StateTimeout, _) => + LOG.warn("Unable to connect to Mesos; still trying...") + stay() + } + + when(ConnectedState) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Disconnected, _) => + LOG.warn("Disconnected from the Mesos master. Reconnecting...") + goto(ConnectingState) + } + --- End diff -- Would it make sense to add a `whenUnhandled {...}` handler here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75298678 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, --- End diff -- Fenzo also has my endorsement. It makes sense to delegate scheduling logic to a dedicated library. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75299022 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo --- End diff -- Unused import --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75300491 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import grizzled.slf4j.Logger + +import akka.actor.{Actor, FSM, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor._ +import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate} +import org.apache.mesos.Protos.TaskState._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.PartialFunction.empty +import scala.concurrent.duration._ + +/** + * Monitors a Mesos task throughout its lifecycle. + * + * Models a task with a state machine reflecting the perceived state of the task in Mesos. The state + * is primarily updated when task status information arrives from Mesos. + * + * The associated state data primarily tracks the task's goal (intended) state, as persisted by the scheduler. + * Keep in mind that goal state is persisted before actions are taken. The goal state strictly transitions + * thru New->Launched->Released. + * + * Unlike most exchanges with Mesos, task status is delivered at-least-once, so status handling should be idempotent. + */ +class TaskMonitor( +flinkConfig: Configuration, +schedulerDriver: SchedulerDriver, +goalState: TaskGoalState) extends Actor with FSM[TaskMonitorState,StateData] { + + val LOG = Logger(getClass) + + startWith(Suspended, StateData(goalState)) + + // + // Suspended State + // + + when(Suspended) { +case Event(update: TaskGoalStateUpdated, _) => + stay() using StateData(update.state) +case Event(msg: StatusUpdate, _) => + stay() +case Event(msg: Connected, StateData(goal: New)) => + goto(New) +case Event(msg: Connected, StateData(goal: Launched)) => + goto(Reconciling) +case Event(msg: Connected, StateData(goal: Released)) => + goto(Killing) + } + + // + // New State + // + + when(New) { +case Event(TaskGoalStateUpdated(goal: Launched), _) => + goto(Staging) using StateData(goal) + } + + // + // Reconciliation State + // + + onTransition { +case _ -> Reconciling => + nextStateData.goal match { +case goal: Launched => + val taskStatus = Protos.TaskStatus.newBuilder() + .setTaskId(goal.taskID).setSlaveId(goal.slaveID).setState(TASK_STAGING).build() + context.parent ! Reconcile(Seq(taskStatus)) --- End diff -- Would it be cleaner to pass the `ActorRef` directly to the TaskMonitor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75300879 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() --- End diff -- space after Protos.TaskID, --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75301383 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks + } + + override def receive: Receive = { + +case msg: Disconnected => + registered = None + context.actorSelection("*").tell(msg, self) + +case msg : Connected => + registered = Some(msg) + context.actorSelection("*").tell(msg, self) + +case msg: TaskGoalStateUpdated => + val taskID = msg.state.taskID + + // ensure task monitor exists + if(!taskMap.contains(taskID)) { +val actorRef = createTask(msg.state) +registered.foreach(actorRef ! _) + } + + taskMap(taskID) ! msg + +case msg: StatusUpdate => + taskMap(msg.status().getTaskId) ! msg + +case msg: Reconcile => + context.parent.forward(msg) --- End diff -- Same as above. The parent is the resource manager. Do we want to make this explicit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75303392 --- Diff: flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.util.{Collections, UUID} +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.FSM.StateTimeout +import akka.testkit._ +import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.{Action1, Action2} +import com.netflix.fenzo.plugins.VMLeaseObject +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.mesos.Protos.{SlaveID, TaskInfo} +import org.apache.mesos.{SchedulerDriver, Protos} +import org.junit.runner.RunWith +import org.mockito.Mockito.{verify, _} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.mockito.{Matchers => MM, Mockito} +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.collection.JavaConverters._ + +import org.apache.flink.mesos.Utils.range +import org.apache.flink.mesos.Utils.ranges +import org.apache.flink.mesos.Utils.scalar + +@RunWith(classOf[JUnitRunner]) +class LaunchCoordinatorTest + extends TestKitBase +with ImplicitSender +with WordSpecLike +with Matchers +with BeforeAndAfterAll { + + lazy val config = new Configuration() + implicit lazy val system = AkkaUtils.createLocalActorSystem(config) + + override def afterAll(): Unit = { +TestKit.shutdownActorSystem(system) + } + + def randomFramework = { + Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID.toString).build + } + + def randomTask = { +val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build + +def generateTaskRequest = { + new TaskRequest() { +private[mesos] val assignedResources = new AtomicReference[TaskRequest.AssignedResources] +override def getId: String = taskID.getValue +override def taskGroupName: String = "" +override def getCPUs: Double = 1.0 +override def getMemory: Double = 1024.0 +override def getNetworkMbps: Double = 0.0 +override def getDisk: Double = 0.0 +override def getPorts: Int = 1 +override def getCustomNamedResources: java.util.Map[String, NamedResourceSetRequest] = + Collections.emptyMap[String, NamedResourceSetRequest] +override def getSoftConstraints: java.util.List[_ <: VMTaskFitnessCalculator] = null +override def getHardConstraints: java.util.List[_ <: ConstraintEvaluator] = null +override def getAssignedResources: AssignedResources = assignedResources.get() +override def setAssignedResources(assignedResources: AssignedResources): Unit = { + this.assignedResources.set(assignedResources) +} + } +} + +val task: LaunchableTask = new LaunchableTask() { + override def taskRequest: TaskRequest = generateTaskRequest + override def launch(slaveId: SlaveID, taskAssignment: TaskAssignmentResult): Protos.TaskInfo = { +Protos.TaskInfo.newBuilder + .setTaskId(taskID).setName(taskID.getValue) + .setCommand(Protos.CommandInfo.newBuilder.setValue("whoami")) + .setSlaveId(slaveId) + .build() + }
[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 Thank you for your work @EronWright. Finally could go through the code. All in all, very impressive as the first step of the Mesos integration! I think this PR is in a mergeable state if some minor comments are addressed. I'm not 100 % sure about all the additional actors yet. It seems like `ReconciliationCoordinator`, `ConnectionMonitor`, `TaskMonitor` could also easily be handled inside `MesosFlinkResourceManager`. In terms of modularity, I can see that having these run independently can give us a more flexible setup. Which of the actors do you plan to re-use in the next set of changes? Clearly, in terms of testability it comes in really handy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75307414 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala --- @@ -58,10 +62,63 @@ class JobInfo( } } - override def toString = s"JobInfo(client: $client ($listeningBehaviour), start: $start)" + + /** +* Notifies all clients by sending a message +* @param message the message to send +*/ + def notifyClients(message: Any) = { +clients foreach { + case (clientActor, _) => +clientActor ! message +} + } + + /** +* Notifies all clients which are not of type detached +* @param message the message to sent to non-detached clients +*/ + def notifyNonDetachedClients(message: Any) = { +clients foreach { + case (clientActor, ListeningBehaviour.DETACHED) => +// do nothing + case (clientActor, _) => +clientActor ! message +} + } + + /** +* Sends a message to job clients that match the listening behavior +* @param message the message to send to all clients +* @param listeningBehaviour the desired listening behaviour +*/ + def notifyClients(message: Any, listeningBehaviour: ListeningBehaviour) = { +clients foreach { + case (clientActor, `listeningBehaviour`) => +clientActor ! message + case _ => +} + } def setLastActive() = lastActive = System.currentTimeMillis() + + + override def toString = s"JobInfo(clients: ${clients.toString()}, start: $start)" + + override def equals(other: Any): Boolean = other match { +case that: JobInfo => + this.isInstanceOf[JobInfo] && --- End diff -- Yes, that's unnecessary because of the case type check. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75442774 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** +* Reconstructs the class loader by first requesting information about it at the JobManager +* and then downloading missing jar files. +* @param jobID id of job +* @param jobManager gateway to the JobManager +* @param config the flink configuration +* @param timeout timeout for querying the jobmanager +* @return A classloader that should behave like the original classloader +* @throws JobRetrievalException if anything goes wrong +*/ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + final Object jmAnswer; + try { + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.Requ
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75442857 --- Diff: flink-tests/src/test/java/org/apache/flink/test/clients/examples/JobRetrievalITCase.java --- @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.test.clients.examples; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import scala.collection.Seq; + +import java.util.concurrent.Semaphore; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + + +/** + * Tests retrieval of a job from a running Flink cluster + */ +public class JobRetrievalITCase { --- End diff -- Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75442830 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java --- @@ -198,10 +211,47 @@ else if (message instanceof SubmitJobAndWait) { decorateMessage(new Status.Failure(new Exception(msg))), ActorRef.noSender()); } } + else if (message instanceof AttachToJobAndWait) { --- End diff -- I agree that this design could be improved. I'll consider factoring out common code into a base class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75443084 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** +* Reconstructs the class loader by first requesting information about it at the JobManager +* and then downloading missing jar files. +* @param jobID id of job +* @param jobManager gateway to the JobManager +* @param config the flink configuration +* @param timeout timeout for querying the jobmanager +* @return A classloader that should behave like the original classloader +* @throws JobRetrievalException if anything goes wrong +*/ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + final Object jmAnswer; + try { + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.Requ
[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2313 Thanks for the review @tillrohrmann. Yes, the plan is to have a `JobClient` API class (the existing JobClient class will be renamed) which uses the SubmissionContext to supervise submitted jobs or attach to existing jobs. All the job-related methods from `ClusterClient` will be moved to this new class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75459375 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); --- End diff -- True, this code assumes that the JobManager doesn't change between retrieving the leading jobmanager and retrieving the class loader. There is always some possible gap where the jobmanager could change. We could mitigate this by retrying in case is has changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2313 @tillrohrmann I've refactored the JobClientActor to include the common code in `JobClientActorBase` and have implementations for submitting/attaching in `JobSubmissionClientActor` and `JobAttachmentClientActor`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75468671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); + LOG.info("Reconstructed class loader for Job {}" , jobID); + } catch (Exception e) { + LOG.warn("Couldn't retrieve classloader for {}. Using system class loader", jobID, e); + classloader = JobClient.class.getClassLoader(); + } + + // we create a proxy JobClientActor that deals with all communication with + // the JobManager. It forwards the job submission, checks the success/failure responses, logs + // update messages, watches for disconnect between client and JobManager, ... + Props jobClientActorProps = JobClientActor.createJobClientActorProps( + leaderRetrievalService, + timeout, + sysoutLogUpdates); + + ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); + + Future attachmentFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.AttachToJobAndWait(jobID), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobID, + attachmentFuture, + jobClientActor, + classloader); + } + + /** +* Reconstructs the class loader by first requesting information about it at the JobManager +* and then downloading missing jar files. +* @param jobID id of job +* @param jobManager gateway to the JobManager +* @param config the flink configuration +* @param timeout timeout for querying the jobmanager +* @return A classloader that should behave like the original classloader +* @throws JobRetrievalException if anything goes wrong +*/ + public static ClassLoader retrieveClassLoader( + JobID jobID, + ActorGateway jobManager, + Configuration config, + FiniteDuration timeout) + throws JobRetrievalException { + + final Object jmAnswer; + try { + jmAnswer = Await.result( + jobManager.ask( + new JobManagerMessages.Requ
[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2313 @tillrohrmann Pinging the actor now to check if it is still alive. Also added another test case for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75474811 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); --- End diff -- Actually, I'm not really sure about this corner case. We don't typically retry client side operations in case the leader has changed after retrieving it. Instead, we just throw an error (see all the methods in `ClusterClient`). The `JobClientActor` is exceptional in this regard and it has to be because it operates independently of the user function. So we could fail if we can't reconstruct the class loader. That of course has the caveat that even if the user doesn't use custom classes for the JobExecutionResult or Exceptions, the job retrieval may fail (e.g. firewall blocking the blobManager port). That's why I didn't want to enforce this step but we could enforce it and fix eventual problems with the BlobManager communication if there are any. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75482760 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -118,27 +138,162 @@ public static JobExecutionResult submitJobAndWait( sysoutLogUpdates); ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps); - + + Future submissionFuture = Patterns.ask( + jobClientActor, + new JobClientMessages.SubmitJobAndWait(jobGraph), + new Timeout(AkkaUtils.INF_TIMEOUT())); + + return new JobListeningContext( + jobGraph.getJobID(), + submissionFuture, + jobClientActor, + classLoader); + } + + + /** +* Attaches to a running Job using the JobID. +* Reconstructs the user class loader by downloading the jars from the JobManager. +* @throws JobRetrievalException if anything goes wrong while retrieving the job +*/ + public static JobListeningContext attachToRunningJob( + JobID jobID, + ActorGateway jobManagerGateWay, + Configuration configuration, + ActorSystem actorSystem, + LeaderRetrievalService leaderRetrievalService, + FiniteDuration timeout, + boolean sysoutLogUpdates) throws JobRetrievalException { + + checkNotNull(jobID, "The jobID must not be null."); + checkNotNull(jobManagerGateWay, "The jobManagerGateWay must not be null."); + checkNotNull(configuration, "The configuration must not be null."); + checkNotNull(actorSystem, "The actorSystem must not be null."); + checkNotNull(leaderRetrievalService, "The jobManagerGateway must not be null."); + checkNotNull(timeout, "The timeout must not be null."); + + // retrieve classloader first before doing anything + ClassLoader classloader; + try { + classloader = retrieveClassLoader(jobID, jobManagerGateWay, configuration, timeout); --- End diff -- In addition to the result, you'll also need the class loader for getting accumulators of a running job. I agree that it would be nice to fail when the class loader can't be reconstructed, but *only* if it is really the only option. So we could start off with the class loader set to `None` in the `JobListeningContext`. When the class loader is needed, i.e. accumulator retrieval or job execution result retrieval, it is fetched. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2313 I've made the last changes concerning the lazy reconstruction of the class loader we discussed. Rebased to master. Should be good to go now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75641421 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75641515 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +/** + * The Mesos environment variables used for settings of the containers. + */ +public class MesosConfigKeys { --- End diff -- Yes, a follow-up is fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75641703 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.cli; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class FlinkMesosSessionCli { --- End diff -- Yes, I recognized it :) Sure! No problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75642452 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- So 0.0 means give me whatever? How about a minimum value, e.g. 500MB? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75642882 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { + + val LOG = Logger(getClass) + + startWith(StoppedState, None) + + when(StoppedState) { +case Event(msg: Start, _) => + LOG.info(s"Connecting to Mesos...") + goto(ConnectingState) + } + + when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Registered, _) => + LOG.info(s"Connected to Mesos as framework ID ${msg.frameworkId.getValue}.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(msg: ReRegistered, _) => + LOG.info("Reconnected to a new Mesos master.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(StateTimeout, _) => + LOG.warn("Unable to connect to Mesos; still trying...") + stay() + } + + when(ConnectedState) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Disconnected, _) => + LOG.warn("Disconnected from the Mesos master. Reconnecting...") + goto(ConnectingState) + } + --- End diff -- The default is to just log a warning and drop the message? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75644911 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75644987 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75645192 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75645973 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); --- End diff -- Do we want to rely on Scala tuples here after the conversion from Scala to Java? I would suggest `InetAddress` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your projec
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75646080 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) --- End diff -- Do we want to rely on Scala tu
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75646889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75646940 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. --- End diff -- Missing `ResourceID` param here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647172 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647510 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647749 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647841 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647928 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647937 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647940 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75648030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75647950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75648199 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75648406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75648837 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; --- End diff -- This is still named "taskmanager" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75648889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; --- End diff -- Also elsewhere in the code it still says TaskManager. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75649283 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2400 Thank you for the pull request @wangzhijiang999! Looks good. I just had some minor remarks on the Scala-->Java conversion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75650141 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader( public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { final JobID jobID = listeningContext.jobID; + final ActorRef jobClientActor = listeningContext.jobClientActor; final Future jobSubmissionFuture = listeningContext.jobResultFuture; final ClassLoader classLoader = listeningContext.classLoader; + while (!jobSubmissionFuture.isCompleted()) { + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e); + } + + try { + Await.result( + Patterns.ask( + jobClientActor, + JobClientMessages.getPing(), --- End diff -- Sure, we can do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75650460 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader( public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { final JobID jobID = listeningContext.jobID; + final ActorRef jobClientActor = listeningContext.jobClientActor; final Future jobSubmissionFuture = listeningContext.jobResultFuture; final ClassLoader classLoader = listeningContext.classLoader; + while (!jobSubmissionFuture.isCompleted()) { + try { + Thread.sleep(250); --- End diff -- Sure, we can do that. We will have longer intervals between the checks then but that is probably fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75650523 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader( public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { final JobID jobID = listeningContext.jobID; + final ActorRef jobClientActor = listeningContext.jobClientActor; final Future jobSubmissionFuture = listeningContext.jobResultFuture; final ClassLoader classLoader = listeningContext.classLoader; + while (!jobSubmissionFuture.isCompleted()) { + try { + Thread.sleep(250); + } catch (InterruptedException e) { + throw new JobExecutionException(jobID, "Interrupted while waiting for execution result.", e); + } + + try { + Await.result( + Patterns.ask( + jobClientActor, + JobClientMessages.getPing(), --- End diff -- Good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75669289 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader( public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { final JobID jobID = listeningContext.jobID; + final ActorRef jobClientActor = listeningContext.jobClientActor; final Future jobSubmissionFuture = listeningContext.jobResultFuture; final ClassLoader classLoader = listeningContext.classLoader; + while (!jobSubmissionFuture.isCompleted()) { + try { + Thread.sleep(250); --- End diff -- Actually, it's not quite as easy. If we simply wait on the ask timeout via `Await.ready/result`, then we complete the `jobSubmissionFuture` with a timeout (if the job runs longer then the timeout interval). Subsequent checks will always return the same result. Thus, we probably need something like a sleep here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75669417 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader( public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { final JobID jobID = listeningContext.jobID; + final ActorRef jobClientActor = listeningContext.jobClientActor; final Future jobSubmissionFuture = listeningContext.jobResultFuture; final ClassLoader classLoader = listeningContext.classLoader; + while (!jobSubmissionFuture.isCompleted()) { + try { + Thread.sleep(250); --- End diff -- Or something like an immutable future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2313: [FLINK-4273] Modify JobClient to attach to running...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2313#discussion_r75676249 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java --- @@ -275,9 +275,35 @@ public static ClassLoader retrieveClassLoader( public static JobExecutionResult awaitJobResult(JobListeningContext listeningContext) throws JobExecutionException { final JobID jobID = listeningContext.jobID; + final ActorRef jobClientActor = listeningContext.jobClientActor; final Future jobSubmissionFuture = listeningContext.jobResultFuture; final ClassLoader classLoader = listeningContext.classLoader; + while (!jobSubmissionFuture.isCompleted()) { + try { + Thread.sleep(250); --- End diff -- My bad, the future was only completed due to some faulty testing code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2313: [FLINK-4273] Modify JobClient to attach to running jobs
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2313 Updated according to our comment discussion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2400 Hi @wangzhijiang999! `ActorSystem` and `LeaderRetrievalService` should only every appear in the factory (startTaskManagerComponentsAndActor) which bring up the TaskExecutor component. The TaskExecutor itself uses the new classes `RpcService` and `HighAvailabilityService`. All other code should change much, except for the Scala->Java changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75680482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75680765 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...
Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2400#discussion_r75680917 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java --- @@ -35,27 +79,634 @@ */ public class TaskExecutor extends RpcEndpoint { - /** The unique resource ID of this TaskExecutor */ + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class); + + /** Return code for critical errors during the runtime */ + private static final int RUNTIME_FAILURE_RETURN_CODE = 2; + + /** The name of the TaskManager actor */ + private static final String TASK_MANAGER_NAME = "taskmanager"; + + /** The unique resource ID of this TaskManager */ private final ResourceID resourceID; /** The access to the leader election and metadata storage services */ private final HighAvailabilityServices haServices; - // - resource manager + /** The task manager configuration */ + private final TaskManagerConfiguration taskManagerConfig; + + /** The connection information of the task manager */ + private final InstanceConnectionInfo connectionInfo; + + /** The I/O manager component in the task manager */ + private final IOManager ioManager; + + /** The memory manager component in the task manager */ + private final MemoryManager memoryManager; + + /** The network component in the task manager */ + private final NetworkEnvironment networkEnvironment; + + /** The number of slots in the task manager, should be 1 for YARN */ + private final int numberOfSlots; + // - resource manager private TaskExecutorToResourceManagerConnection resourceManagerConnection; // public TaskExecutor( + TaskManagerConfiguration taskManagerConfig, + ResourceID resourceID, + InstanceConnectionInfo connectionInfo, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + int numberOfSlots, RpcService rpcService, - HighAvailabilityServices haServices, - ResourceID resourceID) { + HighAvailabilityServices haServices) { super(rpcService); - this.haServices = checkNotNull(haServices); + this.taskManagerConfig = checkNotNull(taskManagerConfig); this.resourceID = checkNotNull(resourceID); + this.connectionInfo = checkNotNull(connectionInfo); + this.memoryManager = checkNotNull(memoryManager); + this.ioManager = checkNotNull(ioManager); + this.networkEnvironment = checkNotNull(networkEnvironment); + this.numberOfSlots = checkNotNull(numberOfSlots); + this.haServices = checkNotNull(haServices); + } + + /** +* Starts and runs the TaskManager. +* +* This method first tries to select the network interface to use for the TaskManager +* communication. The network interface is used both for the actor communication +* (coordination) as well as for the data exchange between task managers. Unless +* the hostname/interface is explicitly configured in the configuration, this +* method will try out various interfaces and methods to connect to the JobManager +* and select the one where the connection attempt is successful. +* +* After selecting the network interface, this method brings up an actor system +* for the TaskManager and its actors, starts the TaskManager's services +* (library cache, shuffle network stack, ...), and starts the TaskManager itself. +* +* @param configurationThe configuration for the TaskManager. +* @param taskManagerClass The actor class to instantiate. +* Allows to use TaskManager subclasses for example for YARN. +*/ + public static void selectNetworkInterfaceAndRunTaskManager( + Configuration configuration, + ResourceID resourceID, + Class taskManagerClass) throws Exception { + + Tuple2 tuple2 = selectNetworkInterfaceAndPort(configuration); + + runTaskManager(tuple2._1(), resourceID, tuple2._2(), configuration, taskManagerClass); + } + + private static Tuple2 selectNetworkInterfaceAndPort(Configuration configuration) + throws Exception { +
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 Hi @vijikarthi! Sorry for the late reply. I intend to merge this as soon as possible. When I run the secure test cases from IntelliJ I get >java.lang.NoClassDefFoundError: jdbm/helper/CachePolicy Investigating why this could be the case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 YARNSessionFIFOSecuredITCase gives me the following: >17:49:58,097 INFO SecurityLogger.org.apache.hadoop.ipc.Server - Auth successful for appattempt_1471880990715_0001_01 (auth:SIMPLE) It is not using Kerberos it seems. We should check that security is really enabled and fail the test if not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 `RollingFileSink` ``` --- T E S T S --- Running org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase Formatting using clusterid: testClusterID java.net.BindException: Permission denied at java.net.PlainSocketImpl.socketBind(Native Method) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:382) at java.net.ServerSocket.bind(ServerSocket.java:375) at org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.getSecureResources(SecureDataNodeStarter.java:103) at org.apache.hadoop.hdfs.MiniDFSCluster.startDataNodes(MiniDFSCluster.java:1213) at org.apache.hadoop.hdfs.MiniDFSCluster.initMiniDFSCluster(MiniDFSCluster.java:684) at org.apache.hadoop.hdfs.MiniDFSCluster.(MiniDFSCluster.java:351) at org.apache.hadoop.hdfs.MiniDFSCluster$Builder.build(MiniDFSCluster.java:332) at org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase.startSecureCluster(RollingSinkSecuredITCase.java:117) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Tests run: 2, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 6.144 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase Time elapsed: 6.143 sec <<< ERROR! java.lang.RuntimeException: Cannot start secure cluster without privileged resources. at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:734) at org.apache.hadoop.hdfs.server.datanode.DataNode.(DataNode.java:316) at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:1848) at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:1748) at org.apache.hadoop.hdfs.MiniDFSCluster.startDataNodes(MiniDFSCluster.java:1218) at org.apache.hadoop.hdfs.MiniDFSCluster.initMiniDFSCluster(MiniDFSCluster.java:684) at org.apache.hadoop.hdfs.MiniDFSCluster.(MiniDFSCluster.java:351) at org.apache.hadoop.hdfs.MiniDFSCluster$Builder.build(MiniDFSCluster.java:332) at org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase.startSecureCluster(RollingSinkSecuredITCase.java:117) org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase Time elapsed: 6.144 sec <<< ERROR! java.lang.NullPointerException: null at org.apache.flink.streaming.connectors.fs.RollingSinkSecuredITCase.teardownSecureCluster(RollingSinkSecuredITCase.java:135) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---