[FLINK-4355] [cluster management] Implement TaskManager side of registration at ResourceManager.
This closes #2353 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f94ae532 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f94ae532 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f94ae532 Branch: refs/heads/flip-6 Commit: f94ae5321199b2c803690278668c1094e18537e5 Parents: 2c54a61 Author: Stephan Ewen <se...@apache.org> Authored: Wed Aug 10 20:42:45 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Sep 8 17:26:58 2016 +0200 ---------------------------------------------------------------------- .../HighAvailabilityServices.java | 39 +++ .../runtime/highavailability/NonHaServices.java | 59 ++++ .../StandaloneLeaderRetrievalService.java | 72 +++-- .../apache/flink/runtime/rpc/RpcEndpoint.java | 1 - .../apache/flink/runtime/rpc/RpcService.java | 27 ++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 18 ++ .../runtime/rpc/akka/messages/RunAsync.java | 1 + .../rpc/registration/RegistrationResponse.java | 84 ++++++ .../rpc/registration/RetryingRegistration.java | 292 +++++++++++++++++++ .../rpc/resourcemanager/ResourceManager.java | 23 ++ .../resourcemanager/ResourceManagerGateway.java | 21 +- .../runtime/rpc/taskexecutor/SlotReport.java | 38 +++ .../runtime/rpc/taskexecutor/TaskExecutor.java | 169 ++++++++--- .../rpc/taskexecutor/TaskExecutorGateway.java | 29 +- .../TaskExecutorRegistrationSuccess.java | 75 +++++ ...TaskExecutorToResourceManagerConnection.java | 194 ++++++++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 51 +++- .../rpc/taskexecutor/TaskExecutorTest.java | 87 +----- 18 files changed, 1105 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java new file mode 100644 index 0000000..094d36f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; + +/** + * This class gives access to all services needed for + * + * <ul> + * <li>ResourceManager leader election and leader retrieval</li> + * <li>JobManager leader election and leader retrieval</li> + * <li>Persistence for checkpoint metadata</li> + * <li>Registering the latest completed checkpoint(s)</li> + * </ul> + */ +public interface HighAvailabilityServices { + + /** + * Gets the leader retriever for the cluster's resource manager. + */ + LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java new file mode 100644 index 0000000..b8c2ed8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; + +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An implementation of the {@link HighAvailabilityServices} for the non-high-availability case. + * This implementation can be used for testing, and for cluster setups that do not + * tolerate failures of the master processes (JobManager, ResourceManager). + * + * <p>This implementation has no dependencies on any external services. It returns fix + * pre-configured leaders, and stores checkpoints and metadata simply on the heap and therefore + * in volatile memory. + */ +public class NonHaServices implements HighAvailabilityServices { + + /** The fix address of the ResourceManager */ + private final String resourceManagerAddress; + + /** + * Creates a new services class for the fix pre-defined leaders. + * + * @param resourceManagerAddress The fix address of the ResourceManager + */ + public NonHaServices(String resourceManagerAddress) { + this.resourceManagerAddress = checkNotNull(resourceManagerAddress); + } + + // ------------------------------------------------------------------------ + // Services + // ------------------------------------------------------------------------ + + @Override + public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { + return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java index 26a34aa..16b163c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java @@ -18,44 +18,74 @@ package org.apache.flink.runtime.leaderretrieval; -import org.apache.flink.util.Preconditions; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** - * Standalone implementation of the {@link LeaderRetrievalService}. The standalone implementation - * assumes that there is only a single {@link org.apache.flink.runtime.jobmanager.JobManager} whose - * address is given to the service when creating it. This address is directly given to the - * {@link LeaderRetrievalListener} when the service is started. + * Standalone implementation of the {@link LeaderRetrievalService}. This implementation + * assumes that there is only a single contender for leadership + * (e.g., a single JobManager or ResourceManager process) and that this process is + * reachable under a constant address. + * + * <p>As soon as this service is started, it immediately notifies the leader listener + * of the leader contender with the pre-configured address. */ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService { - /** Address of the only JobManager */ - private final String jobManagerAddress; + private final Object startStopLock = new Object(); + + /** The fix address of the leader */ + private final String leaderAddress; + + /** The fix leader ID (leader lock fencing token) */ + private final UUID leaderId; - /** Listener which wants to be notified about the new leader */ - private LeaderRetrievalListener leaderListener; + /** Flag whether this service is started */ + private boolean started; /** - * Creates a StandaloneLeaderRetrievalService with the given JobManager address. + * Creates a StandaloneLeaderRetrievalService with the given leader address. + * The leaderId will be null. * - * @param jobManagerAddress The JobManager's address which is returned to the - * {@link LeaderRetrievalListener} + * @param leaderAddress The leader's pre-configured address */ - public StandaloneLeaderRetrievalService(String jobManagerAddress) { - this.jobManagerAddress = jobManagerAddress; + public StandaloneLeaderRetrievalService(String leaderAddress) { + this.leaderAddress = checkNotNull(leaderAddress); + this.leaderId = null; } + /** + * Creates a StandaloneLeaderRetrievalService with the given leader address. + * + * @param leaderAddress The leader's pre-configured address + * @param leaderId The constant leaderId. + */ + public StandaloneLeaderRetrievalService(String leaderAddress, UUID leaderId) { + this.leaderAddress = checkNotNull(leaderAddress); + this.leaderId = checkNotNull(leaderId); + } + + // ------------------------------------------------------------------------ + @Override public void start(LeaderRetrievalListener listener) { - Preconditions.checkNotNull(listener, "Listener must not be null."); - Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " + - "only be started once."); + checkNotNull(listener, "Listener must not be null."); - leaderListener = listener; + synchronized (startStopLock) { + checkState(!started, "StandaloneLeaderRetrievalService can only be started once."); + started = true; - // directly notify the listener, because we already know the leading JobManager's address - leaderListener.notifyLeaderAddress(jobManagerAddress, null); + // directly notify the listener, because we already know the leading JobManager's address + listener.notifyLeaderAddress(leaderAddress, leaderId); + } } @Override - public void stop() {} + public void stop() { + synchronized (startStopLock) { + started = false; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java index 67ac182..a28bc14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java @@ -237,7 +237,6 @@ public abstract class RpcEndpoint<C extends RpcGateway> { * }</pre> */ public void validateRunsInMainThread() { - // because the initialization is lazy, it can be that certain methods are assert currentMainThread.get() == Thread.currentThread(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index f93be83..fabdb05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -18,8 +18,11 @@ package org.apache.flink.runtime.rpc; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * Interface for rpc services. An rpc service is used to start and connect to a {@link RpcEndpoint}. * Connecting to a rpc server will return a {@link RpcGateway} which can be used to call remote @@ -71,4 +74,28 @@ public interface RpcService { * @return Fully qualified address */ <C extends RpcGateway> String getAddress(C selfGateway); + + /** + * Gets the execution context, provided by this RPC service. This execution + * context can be used for example for the {@code onComplete(...)} or {@code onSuccess(...)} + * methods of Futures. + * + * <p><b>IMPORTANT:</b> This execution context does not isolate the method invocations against + * any concurrent invocations and is therefore not suitable to run completion methods of futures + * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the + * {@link RpcEndpoint#getMainThreadExecutionContext() MainThreadExecutionContext} of that + * {@code RpcEndpoint}. + * + * @return The execution context provided by the RPC service + */ + ExecutionContext getExecutionContext(); + + /** + * Execute the runnable in the execution context of this RPC Service, as returned by + * {@link #getExecutionContext()}, after a scheduled delay. + * + * @param runnable Runnable to be executed + * @param delay The delay after which the runnable will be executed + */ + void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 7b33524..b647bbd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -38,14 +38,18 @@ import org.apache.flink.runtime.rpc.StartStoppable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext; import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import javax.annotation.concurrent.ThreadSafe; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.TimeUnit; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -199,4 +203,18 @@ public class AkkaRpcService implements RpcService { throw new IllegalArgumentException("Cannot get address for non " + className + '.'); } } + + @Override + public ExecutionContext getExecutionContext() { + return actorSystem.dispatcher(); + } + + @Override + public void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit) { + checkNotNull(runnable, "runnable"); + checkNotNull(unit, "unit"); + checkArgument(delay >= 0, "delay must be zero or larger"); + + actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, getExecutionContext()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java index c18906c..ce4f9d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java @@ -36,6 +36,7 @@ public final class RunAsync implements Serializable { private final long delay; /** + * Creates a new {@code RunAsync} message. * * @param runnable The Runnable to run. * @param delay The delay in milliseconds. Zero indicates immediate execution. http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java new file mode 100644 index 0000000..2de560a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RegistrationResponse.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.registration; + +import java.io.Serializable; + +/** + * Base class for responses given to registration attempts from {@link RetryingRegistration}. + */ +public abstract class RegistrationResponse implements Serializable { + + private static final long serialVersionUID = 1L; + + // ---------------------------------------------------------------------------- + + /** + * Base class for a successful registration. Concrete registration implementations + * will typically extend this class to attach more information. + */ + public static class Success extends RegistrationResponse { + private static final long serialVersionUID = 1L; + + @Override + public String toString() { + return "Registration Successful"; + } + } + + // ---------------------------------------------------------------------------- + + /** + * A rejected (declined) registration. + */ + public static final class Decline extends RegistrationResponse { + private static final long serialVersionUID = 1L; + + /** the rejection reason */ + private final String reason; + + /** + * Creates a new rejection message. + * + * @param reason The reason for the rejection. + */ + public Decline(String reason) { + this.reason = reason != null ? reason : "(unknown)"; + } + + /** + * Gets the reason for the rejection. + */ + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "Registration Declined (" + reason + ')'; + } + } +} + + + + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java new file mode 100644 index 0000000..4c93684 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/registration/RetryingRegistration.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.registration; + +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; + +import org.slf4j.Logger; + +import scala.concurrent.Future; +import scala.concurrent.Promise; +import scala.concurrent.impl.Promise.DefaultPromise; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + + +/** + * This utility class implements the basis of registering one component at another component, + * for example registering the TaskExecutor at the ResourceManager. + * This {@code RetryingRegistration} implements both the initial address resolution + * and the retries-with-backoff strategy. + * + * <p>The registration gives access to a future that is completed upon successful registration. + * The registration can be canceled, for example when the target where it tries to register + * at looses leader status. + * + * @param <Gateway> The type of the gateway to connect to. + * @param <Success> The type of the successful registration responses. + */ +public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> { + + // ------------------------------------------------------------------------ + // default configuration values + // ------------------------------------------------------------------------ + + private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100; + + private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000; + + private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000; + + private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000; + + // ------------------------------------------------------------------------ + // Fields + // ------------------------------------------------------------------------ + + private final Logger log; + + private final RpcService rpcService; + + private final String targetName; + + private final Class<Gateway> targetType; + + private final String targetAddress; + + private final UUID leaderId; + + private final Promise<Tuple2<Gateway, Success>> completionPromise; + + private final long initialRegistrationTimeout; + + private final long maxRegistrationTimeout; + + private final long delayOnError; + + private final long delayOnRefusedRegistration; + + private volatile boolean canceled; + + // ------------------------------------------------------------------------ + + public RetryingRegistration( + Logger log, + RpcService rpcService, + String targetName, + Class<Gateway> targetType, + String targetAddress, + UUID leaderId) { + this(log, rpcService, targetName, targetType, targetAddress, leaderId, + INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS, + ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS); + } + + public RetryingRegistration( + Logger log, + RpcService rpcService, + String targetName, + Class<Gateway> targetType, + String targetAddress, + UUID leaderId, + long initialRegistrationTimeout, + long maxRegistrationTimeout, + long delayOnError, + long delayOnRefusedRegistration) { + + checkArgument(initialRegistrationTimeout > 0, "initial registration timeout must be greater than zero"); + checkArgument(maxRegistrationTimeout > 0, "maximum registration timeout must be greater than zero"); + checkArgument(delayOnError >= 0, "delay on error must be non-negative"); + checkArgument(delayOnRefusedRegistration >= 0, "delay on refused registration must be non-negative"); + + this.log = checkNotNull(log); + this.rpcService = checkNotNull(rpcService); + this.targetName = checkNotNull(targetName); + this.targetType = checkNotNull(targetType); + this.targetAddress = checkNotNull(targetAddress); + this.leaderId = checkNotNull(leaderId); + this.initialRegistrationTimeout = initialRegistrationTimeout; + this.maxRegistrationTimeout = maxRegistrationTimeout; + this.delayOnError = delayOnError; + this.delayOnRefusedRegistration = delayOnRefusedRegistration; + + this.completionPromise = new DefaultPromise<>(); + } + + // ------------------------------------------------------------------------ + // completion and cancellation + // ------------------------------------------------------------------------ + + public Future<Tuple2<Gateway, Success>> getFuture() { + return completionPromise.future(); + } + + /** + * Cancels the registration procedure. + */ + public void cancel() { + canceled = true; + } + + /** + * Checks if the registration was canceled. + * @return True if the registration was canceled, false otherwise. + */ + public boolean isCanceled() { + return canceled; + } + + // ------------------------------------------------------------------------ + // registration + // ------------------------------------------------------------------------ + + protected abstract Future<RegistrationResponse> invokeRegistration( + Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception; + + /** + * This method resolves the target address to a callable gateway and starts the + * registration after that. + */ + @SuppressWarnings("unchecked") + public void startRegistration() { + try { + // trigger resolution of the resource manager address to a callable gateway + Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType); + + // upon success, start the registration attempts + resourceManagerFuture.onSuccess(new OnSuccess<Gateway>() { + @Override + public void onSuccess(Gateway result) { + log.info("Resolved {} address, beginning registration", targetName); + register(result, 1, initialRegistrationTimeout); + } + }, rpcService.getExecutionContext()); + + // upon failure, retry, unless this is cancelled + resourceManagerFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) { + if (!isCanceled()) { + log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress); + startRegistration(); + } + } + }, rpcService.getExecutionContext()); + } + catch (Throwable t) { + cancel(); + completionPromise.tryFailure(t); + } + } + + /** + * This method performs a registration attempt and triggers either a success notification or a retry, + * depending on the result. + */ + @SuppressWarnings("unchecked") + private void register(final Gateway gateway, final int attempt, final long timeoutMillis) { + // eager check for canceling to avoid some unnecessary work + if (canceled) { + return; + } + + try { + log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis); + Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); + + // if the registration was successful, let the TaskExecutor know + registrationFuture.onSuccess(new OnSuccess<RegistrationResponse>() { + + @Override + public void onSuccess(RegistrationResponse result) throws Throwable { + if (!isCanceled()) { + if (result instanceof RegistrationResponse.Success) { + // registration successful! + Success success = (Success) result; + completionPromise.success(new Tuple2<>(gateway, success)); + } + else { + // registration refused or unknown + if (result instanceof RegistrationResponse.Decline) { + RegistrationResponse.Decline decline = (RegistrationResponse.Decline) result; + log.info("Registration at {} was declined: {}", targetName, decline.getReason()); + } else { + log.error("Received unknown response to registration attempt: " + result); + } + + log.info("Pausing and re-attempting registration in {} ms", delayOnRefusedRegistration); + registerLater(gateway, 1, initialRegistrationTimeout, delayOnRefusedRegistration); + } + } + } + }, rpcService.getExecutionContext()); + + // upon failure, retry + registrationFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) { + if (!isCanceled()) { + if (failure instanceof TimeoutException) { + // we simply have not received a response in time. maybe the timeout was + // very low (initial fast registration attempts), maybe the target endpoint is + // currently down. + if (log.isDebugEnabled()) { + log.debug("Registration at {} ({}) attempt {} timed out after {} ms", + targetName, targetAddress, attempt, timeoutMillis); + } + + long newTimeoutMillis = Math.min(2 * timeoutMillis, maxRegistrationTimeout); + register(gateway, attempt + 1, newTimeoutMillis); + } + else { + // a serious failure occurred. we still should not give up, but keep trying + log.error("Registration at " + targetName + " failed due to an error", failure); + log.info("Pausing and re-attempting registration in {} ms", delayOnError); + + registerLater(gateway, 1, initialRegistrationTimeout, delayOnError); + } + } + } + }, rpcService.getExecutionContext()); + } + catch (Throwable t) { + cancel(); + completionPromise.tryFailure(t); + } + } + + private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) { + rpcService.scheduleRunnable(new Runnable() { + @Override + public void run() { + register(gateway, attempt, timeoutMillis); + } + }, delay, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java index 729ef0c..6f34465 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java @@ -19,19 +19,24 @@ package org.apache.flink.runtime.rpc.resourcemanager; import akka.dispatch.Mapper; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.util.Preconditions; + import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext$; import scala.concurrent.Future; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutorService; /** @@ -93,4 +98,22 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { System.out.println("SlotRequest: " + slotRequest); return new SlotAssignment(); } + + + /** + * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers + * + * @return The response by the ResourceManager. + */ + @RpcMethod + public org.apache.flink.runtime.rpc.registration.RegistrationResponse registerTaskExecutor( + UUID resourceManagerLeaderId, + String taskExecutorAddress, + ResourceID resourceID) { + + return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java index 464a261..afddb01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerGateway.java @@ -18,14 +18,18 @@ package org.apache.flink.runtime.rpc.resourcemanager; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; + import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.util.UUID; + /** - * {@link ResourceManager} rpc gateway interface. + * The {@link ResourceManager}'s RPC gateway interface. */ public interface ResourceManagerGateway extends RpcGateway { @@ -55,4 +59,19 @@ public interface ResourceManagerGateway extends RpcGateway { * @return Future slot assignment */ Future<SlotAssignment> requestSlot(SlotRequest slotRequest); + + /** + * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers + * @param timeout The timeout for the response. + * + * @return The future to the response by the ResourceManager. + */ + Future<org.apache.flink.runtime.rpc.registration.RegistrationResponse> registerTaskExecutor( + UUID resourceManagerLeaderId, + String taskExecutorAddress, + ResourceID resourceID, + @RpcTimeout FiniteDuration timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java new file mode 100644 index 0000000..e42fa4a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import java.io.Serializable; + +/** + * A report about the current status of all slots of the TaskExecutor, describing + * which slots are available and allocated, and what jobs (JobManagers) the allocated slots + * have been allocated to. + */ +public class SlotReport implements Serializable{ + + private static final long serialVersionUID = 1L; + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "SlotReport"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java index 3a7dd9f..1a637bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java @@ -18,67 +18,152 @@ package org.apache.flink.runtime.rpc.taskexecutor; -import akka.dispatch.ExecutionContexts$; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcMethod; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.util.Preconditions; -import scala.concurrent.ExecutionContext; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.UUID; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * TaskExecutor implementation. The task executor is responsible for the execution of multiple * {@link org.apache.flink.runtime.taskmanager.Task}. - * - * It offers the following methods as part of its rpc interface to interact with him remotely: - * <ul> - * <li>{@link #executeTask(TaskDeploymentDescriptor)} executes a given task on the TaskExecutor</li> - * <li>{@link #cancelTask(ExecutionAttemptID)} cancels a given task identified by the {@link ExecutionAttemptID}</li> - * </ul> */ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { - private final ExecutionContext executionContext; - private final Set<ExecutionAttemptID> tasks = new HashSet<>(); - public TaskExecutor(RpcService rpcService, ExecutorService executorService) { + /** The unique resource ID of this TaskExecutor */ + private final ResourceID resourceID; + + /** The access to the leader election and metadata storage services */ + private final HighAvailabilityServices haServices; + + // --------- resource manager -------- + + private TaskExecutorToResourceManagerConnection resourceManagerConnection; + + // ------------------------------------------------------------------------ + + public TaskExecutor( + RpcService rpcService, + HighAvailabilityServices haServices, + ResourceID resourceID) { + super(rpcService); - this.executionContext = ExecutionContexts$.MODULE$.fromExecutor( - Preconditions.checkNotNull(executorService)); + + this.haServices = checkNotNull(haServices); + this.resourceID = checkNotNull(resourceID); + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public ResourceID getResourceID() { + return resourceID; + } + + // ------------------------------------------------------------------------ + // Life cycle + // ------------------------------------------------------------------------ + + @Override + public void start() { + // start by connecting to the ResourceManager + try { + haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener()); + } catch (Exception e) { + onFatalErrorAsync(e); + } + } + + + // ------------------------------------------------------------------------ + // RPC methods - ResourceManager related + // ------------------------------------------------------------------------ + + @RpcMethod + public void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { + if (resourceManagerConnection != null) { + if (newLeaderAddress != null) { + // the resource manager switched to a new leader + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress); + } + else { + // address null means that the current leader is lost without a new leader being there, yet + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getResourceManagerAddress()); + } + + // drop the current connection or connection attempt + if (resourceManagerConnection != null) { + resourceManagerConnection.close(); + resourceManagerConnection = null; + } + } + + // establish a connection to the new leader + if (newLeaderAddress != null) { + log.info("Attempting to register at ResourceManager {}", newLeaderAddress); + resourceManagerConnection = + new TaskExecutorToResourceManagerConnection(log, this, newLeaderAddress, newLeaderId); + resourceManagerConnection.start(); + } } + // ------------------------------------------------------------------------ + // Error handling + // ------------------------------------------------------------------------ + /** - * Execute the given task on the task executor. The task is described by the provided - * {@link TaskDeploymentDescriptor}. - * - * @param taskDeploymentDescriptor Descriptor for the task to be executed - * @return Acknowledge the start of the task execution + * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. + * This method should be used when asynchronous threads want to notify the + * TaskExecutor of a fatal error. + * + * @param t The exception describing the fatal error */ - @RpcMethod - public Acknowledge executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor) { - tasks.add(taskDeploymentDescriptor.getExecutionId()); - return Acknowledge.get(); + void onFatalErrorAsync(final Throwable t) { + runAsync(new Runnable() { + @Override + public void run() { + onFatalError(t); + } + }); } /** - * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then - * the method throws an {@link Exception}. - * - * @param executionAttemptId Execution attempt ID identifying the task to be canceled. - * @return Acknowledge the task canceling - * @throws Exception if the task with the given execution attempt id could not be found + * Notifies the TaskExecutor that a fatal error has occurred and it cannot proceed. + * This method must only be called from within the TaskExecutor's main thread. + * + * @param t The exception describing the fatal error */ - @RpcMethod - public Acknowledge cancelTask(ExecutionAttemptID executionAttemptId) throws Exception { - if (tasks.contains(executionAttemptId)) { - return Acknowledge.get(); - } else { - throw new Exception("Could not find task."); + void onFatalError(Throwable t) { + // to be determined, probably delegate to a fatal error handler that + // would either log (mini cluster) ot kill the process (yarn, mesos, ...) + log.error("FATAL ERROR", t); + } + + // ------------------------------------------------------------------------ + // Utility classes + // ------------------------------------------------------------------------ + + /** + * The listener for leader changes of the resource manager + */ + private class ResourceManagerLeaderListener implements LeaderRetrievalListener { + + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + getSelf().notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); + } + + @Override + public void handleError(Exception exception) { + onFatalErrorAsync(exception); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java index 450423e..b0b21b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorGateway.java @@ -18,31 +18,18 @@ package org.apache.flink.runtime.rpc.taskexecutor; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; -import scala.concurrent.Future; + +import java.util.UUID; /** - * {@link TaskExecutor} rpc gateway interface + * {@link TaskExecutor} RPC gateway interface */ public interface TaskExecutorGateway extends RpcGateway { - /** - * Execute the given task on the task executor. The task is described by the provided - * {@link TaskDeploymentDescriptor}. - * - * @param taskDeploymentDescriptor Descriptor for the task to be executed - * @return Future acknowledge of the start of the task execution - */ - Future<Acknowledge> executeTask(TaskDeploymentDescriptor taskDeploymentDescriptor); - /** - * Cancel a task identified by it {@link ExecutionAttemptID}. If the task cannot be found, then - * the method throws an {@link Exception}. - * - * @param executionAttemptId Execution attempt ID identifying the task to be canceled. - * @return Future acknowledge of the task canceling - */ - Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptId); + // ------------------------------------------------------------------------ + // ResourceManager handlers + // ------------------------------------------------------------------------ + + void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); } http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java new file mode 100644 index 0000000..641102d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorRegistrationSuccess.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rpc.registration.RegistrationResponse; + +import java.io.Serializable; + +/** + * Base class for responses from the ResourceManager to a registration attempt by a + * TaskExecutor. + */ +public final class TaskExecutorRegistrationSuccess extends RegistrationResponse.Success implements Serializable { + + private static final long serialVersionUID = 1L; + + private final InstanceID registrationId; + + private final long heartbeatInterval; + + /** + * Create a new {@code TaskExecutorRegistrationSuccess} message. + * + * @param registrationId The ID that the ResourceManager assigned the registration. + * @param heartbeatInterval The interval in which the ResourceManager will heartbeat the TaskExecutor. + */ + public TaskExecutorRegistrationSuccess(InstanceID registrationId, long heartbeatInterval) { + this.registrationId = registrationId; + this.heartbeatInterval = heartbeatInterval; + } + + /** + * Gets the ID that the ResourceManager assigned the registration. + */ + public InstanceID getRegistrationId() { + return registrationId; + } + + /** + * Gets the interval in which the ResourceManager will heartbeat the TaskExecutor. + */ + public long getHeartbeatInterval() { + return heartbeatInterval; + } + + @Override + public String toString() { + return "TaskExecutorRegistrationSuccess (" + registrationId + " / " + heartbeatInterval + ')'; + } + +} + + + + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java new file mode 100644 index 0000000..ef75862 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.taskexecutor; + +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.registration.RegistrationResponse; +import org.apache.flink.runtime.rpc.registration.RetryingRegistration; +import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; + +import org.slf4j.Logger; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +public class TaskExecutorToResourceManagerConnection { + + /** the logger for all log messages of this class */ + private final Logger log; + + /** the TaskExecutor whose connection to the ResourceManager this represents */ + private final TaskExecutor taskExecutor; + + private final UUID resourceManagerLeaderId; + + private final String resourceManagerAddress; + + private ResourceManagerRegistration pendingRegistration; + + private ResourceManagerGateway registeredResourceManager; + + private InstanceID registrationId; + + /** flag indicating that the connection is closed */ + private volatile boolean closed; + + + public TaskExecutorToResourceManagerConnection( + Logger log, + TaskExecutor taskExecutor, + String resourceManagerAddress, + UUID resourceManagerLeaderId) { + + this.log = checkNotNull(log); + this.taskExecutor = checkNotNull(taskExecutor); + this.resourceManagerAddress = checkNotNull(resourceManagerAddress); + this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + } + + // ------------------------------------------------------------------------ + // Life cycle + // ------------------------------------------------------------------------ + + @SuppressWarnings("unchecked") + public void start() { + checkState(!closed, "The connection is already closed"); + checkState(!isRegistered() && pendingRegistration == null, "The connection is already started"); + + ResourceManagerRegistration registration = new ResourceManagerRegistration( + log, taskExecutor.getRpcService(), + resourceManagerAddress, resourceManagerLeaderId, + taskExecutor.getAddress(), taskExecutor.getResourceID()); + + Future<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>> future = registration.getFuture(); + + future.onSuccess(new OnSuccess<Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess>>() { + @Override + public void onSuccess(Tuple2<ResourceManagerGateway, TaskExecutorRegistrationSuccess> result) { + registeredResourceManager = result.f0; + registrationId = result.f1.getRegistrationId(); + } + }, taskExecutor.getMainThreadExecutionContext()); + + // this future should only ever fail if there is a bug, not if the registration is declined + future.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) { + taskExecutor.onFatalError(failure); + } + }, taskExecutor.getMainThreadExecutionContext()); + } + + public void close() { + closed = true; + + // make sure we do not keep re-trying forever + if (pendingRegistration != null) { + pendingRegistration.cancel(); + } + } + + public boolean isClosed() { + return closed; + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public UUID getResourceManagerLeaderId() { + return resourceManagerLeaderId; + } + + public String getResourceManagerAddress() { + return resourceManagerAddress; + } + + /** + * Gets the ResourceManagerGateway. This returns null until the registration is completed. + */ + public ResourceManagerGateway getResourceManager() { + return registeredResourceManager; + } + + /** + * Gets the ID under which the TaskExecutor is registered at the ResourceManager. + * This returns null until the registration is completed. + */ + public InstanceID getRegistrationId() { + return registrationId; + } + + public boolean isRegistered() { + return registeredResourceManager != null; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return String.format("Connection to ResourceManager %s (leaderId=%s)", + resourceManagerAddress, resourceManagerLeaderId); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + static class ResourceManagerRegistration + extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { + + private final String taskExecutorAddress; + + private final ResourceID resourceID; + + public ResourceManagerRegistration( + Logger log, + RpcService rpcService, + String targetAddress, + UUID leaderId, + String taskExecutorAddress, + ResourceID resourceID) { + + super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); + this.taskExecutorAddress = checkNotNull(taskExecutorAddress); + this.resourceID = checkNotNull(resourceID); + } + + @Override + protected Future<RegistrationResponse> invokeRegistration( + ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { + + FiniteDuration timeout = new FiniteDuration(timeoutMillis, TimeUnit.MILLISECONDS); + return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index fd55904..7b4ab89 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -20,15 +20,17 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import akka.util.Timeout; + +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; import org.apache.flink.util.TestLogger; + +import org.junit.AfterClass; import org.junit.Test; + import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; @@ -41,6 +43,49 @@ import static org.junit.Assert.assertTrue; public class AkkaRpcServiceTest extends TestLogger { + // ------------------------------------------------------------------------ + // shared test members + // ------------------------------------------------------------------------ + + private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + + private static AkkaRpcService akkaRpcService = + new AkkaRpcService(actorSystem, new Timeout(10000, TimeUnit.MILLISECONDS)); + + @AfterClass + public static void shutdown() { + akkaRpcService.stopService(); + actorSystem.shutdown(); + } + + // ------------------------------------------------------------------------ + // tests + // ------------------------------------------------------------------------ + + @Test + public void testScheduleRunnable() throws Exception { + final OneShotLatch latch = new OneShotLatch(); + final long delay = 100; + final long start = System.nanoTime(); + + akkaRpcService.scheduleRunnable(new Runnable() { + @Override + public void run() { + latch.trigger(); + } + }, delay, TimeUnit.MILLISECONDS); + + latch.await(); + final long stop = System.nanoTime(); + + assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); + } + + // ------------------------------------------------------------------------ + // specific component tests - should be moved to the test classes + // for those components + // ------------------------------------------------------------------------ + /** * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the * {@link AkkaRpcService}. http://git-wip-us.apache.org/repos/asf/flink/blob/f94ae532/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java index c96f4f6..9f9bab3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java @@ -18,93 +18,8 @@ package org.apache.flink.runtime.rpc.taskexecutor; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobKey; -import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.MainThreadExecutor; -import org.apache.flink.runtime.rpc.RpcEndpoint; -import org.apache.flink.runtime.rpc.RpcGateway; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.StartStoppable; -import org.apache.flink.runtime.util.DirectExecutorService; -import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.cglib.proxy.InvocationHandler; -import org.mockito.cglib.proxy.Proxy; -import scala.concurrent.Future; - -import java.net.URL; -import java.util.Collections; - -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class TaskExecutorTest extends TestLogger { - - /** - * Tests that we can deploy and cancel a task on the TaskExecutor without exceptions - */ - @Test - public void testTaskExecution() throws Exception { - RpcService testingRpcService = mock(RpcService.class); - InvocationHandler invocationHandler = mock(InvocationHandler.class); - Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler); - when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway); - - DirectExecutorService directExecutorService = new DirectExecutorService(); - TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); - taskExecutor.start(); - - TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), - "Test job", - new JobVertexID(), - new ExecutionAttemptID(), - new SerializedValue<ExecutionConfig>(null), - "Test task", - 0, - 1, - 0, - new Configuration(), - new Configuration(), - "Invokable", - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - 0 - ); - - Acknowledge ack = taskExecutor.executeTask(tdd); - - ack = taskExecutor.cancelTask(tdd.getExecutionId()); - } - - /** - * Tests that cancelling a non-existing task will return an exception - */ - @Test(expected=Exception.class) - public void testWrongTaskCancellation() throws Exception { - RpcService testingRpcService = mock(RpcService.class); - InvocationHandler invocationHandler = mock(InvocationHandler.class); - Object selfGateway = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] {TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, invocationHandler); - when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway); - DirectExecutorService directExecutorService = null; - TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, directExecutorService); - taskExecutor.start(); - - taskExecutor.cancelTask(new ExecutionAttemptID()); - - fail("The cancellation should have thrown an exception."); - } + }