[
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15434831#comment-15434831
]
ASF GitHub Bot commented on FLINK-4449:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2410#discussion_r76047139
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java
---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This utility class implements the basis of trigger heartbeat from one
component to another component periodically,
+ * for example trigger heartbeat from the ResourceManager to TaskExecutor.
+ *
+ * @param <Gateway> The type of the gateway to connect to.
+ * @param <Payload> The type of the successful heartbeat responses with
payload.
+ */
+public abstract class HeartbeatScheduler<Gateway extends RpcGateway,
Payload extends Serializable> {
+ /** default heartbeat interval time in millisecond */
+ private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
+
+ /** default heartbeat timeout in millisecond */
+ private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
+
+ /** default max heartbeat interval time in millisecond (which is used
in retry heartbeat case) */
+ private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 30000;
+
+ /** default heartbeat attempt delay after an exception has occurred */
+ private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
+
+ /** default max heartbeat retry time for one heartbeat */
+ private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 60000;
+
+ private final long heartbeatInterval;
+
+ private final long heartbeatTimeout;
+
+ private final long maxHeartbeatTimeout;
+
+ private final long delayOnError;
+
+ private final long maxAttemptTime;
+
+ /** target gateway to receive the heartbeat and give heartbeatResponse
*/
+ protected final Gateway targetGateway;
+
+ /** the target address */
+ private final String targetAddress;
+
+ /** the target gateway name */
+ private final String targetName;
+
+ private final RpcService rpcService;
+
+ private final UUID leaderID;
+
+ private final Logger log;
+
+ private volatile boolean closed;
+
+ /**
+ * @param rpcService rpcService
+ * @param leaderID leader session id of current source end which
send heartbeat
+ * @param targetGateway target gateway which receive heartbeat and
response
+ * @param targetAddress target gateway address
+ * @param targetName target name
+ * @param log log
+ */
+ public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway
targetGateway,
+ String targetAddress, String targetName, Logger log) {
+ this(rpcService, leaderID, targetGateway, targetAddress,
targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS,
+ INITIAL_HEARTBEAT_TIMEOUT_MILLIS,
MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS,
MAX_HEARTBEAT_ATTEMPT_MILLIS);
+ }
+
+ /**
+ * @param rpcService rpcService
+ * @param leaderID leader session id of current source end
which send heartbeat
+ * @param targetGateway target gateway which receive heartbeat
and response
+ * @param targetAddress target gateway address
+ * @param targetName target name
+ * @param log log
+ * @param heartbeatInterval heartbeat interval time in millisecond
+ * @param heartbeatTimeout heartbeat timeout in millisecond
+ * @param maxHeartbeatTimeout max heartbeat interval time in millisecond
+ * @param delayOnError Heartbeat attempt delay after an
exception has occurred
+ * @param maxAttemptTime max retry time for one heartbeat
+ */
+ public HeartbeatScheduler(
+ RpcService rpcService, UUID leaderID, Gateway targetGateway,
+ String targetAddress, String targetName, Logger log, long
heartbeatInterval,
+ long heartbeatTimeout, long maxHeartbeatTimeout, long
delayOnError, long maxAttemptTime) {
+ checkArgument(heartbeatInterval > 0, "initial heartbeat
interval must be greater than zero");
+ checkArgument(heartbeatTimeout > 0, "initial heartbeat timeout
must be greater than zero");
+ checkArgument(maxHeartbeatTimeout > 0, "maximum heartbeat
timeout must be greater than zero");
+ checkArgument(delayOnError >= 0, "delay on error must be
non-negative");
+ checkArgument(maxAttemptTime >= 0, "max attempt on error must
be non-negative");
+ this.rpcService = checkNotNull(rpcService);
+ this.leaderID = checkNotNull(leaderID);
+ this.targetGateway = checkNotNull(targetGateway);
+ this.targetAddress = checkNotNull(targetAddress);
+ this.targetName = checkNotNull(targetName);
+ this.log = checkNotNull(log);
+ this.heartbeatInterval = heartbeatInterval;
+ this.heartbeatTimeout = heartbeatTimeout;
+ this.maxHeartbeatTimeout = maxHeartbeatTimeout;
+ this.delayOnError = delayOnError;
+ this.maxAttemptTime = maxAttemptTime;
+ }
+
+ /**
+ * start to schedule heartbeat
+ */
+ public void start() {
+ checkState(!closed, "The heartbeat connection is already
closed");
+ long currentHeartbeatBeginTime = System.currentTimeMillis();
+ sendHeartbeatToTaskManager(1, heartbeatTimeout,
currentHeartbeatBeginTime);
+ }
+
+ /**
+ * Checks if the heartbeat schedule was closed.
+ *
+ * @return True if the heartbeat schedule was closed, false otherwise.
+ */
+ public boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * stop to schedule heartbeat
+ */
+ public void close() {
+ closed = true;
+ }
+
+ /**
+ * get the heartbeat interval
+ *
+ * @return heartbeat interval
+ */
+ public long getHeartbeatInterval() {
+ return heartbeatInterval;
+ }
+
+ /**
+ * trigger heartbeat to target gateway
+ *
+ * @param leaderID leader session id of current sender
+ * @param timeout timeout for heartbeat response
+ * @return HeartbeatResponsePayload wrapped in future
+ */
+ protected abstract Future<Payload> triggerHeartbeat(UUID leaderID,
FiniteDuration timeout);
+
+ /**
+ * report heartbeat response payload to sender who sending heartbeat
+ *
+ * @param heartbeatResponsePayload heartbeat response which contains
payload
+ */
+ protected abstract void reportHeartbeatPayload(Payload
heartbeatResponsePayload);
+
+ /**
+ * callback method when heartbeat sender lost heartbeat with target
+ */
+ protected abstract void lossHeartbeat();
+
+ /**
+ * send a heartbeat attempt to target, receive the response from target
or failed depends on the future result.
+ *
+ * @param attempt
+ * @param timeoutMillis
+ * @param currentHeartbeatBeginTime
+ */
+ private void sendHeartbeatToTaskManager(final int attempt, final long
timeoutMillis, final long currentHeartbeatBeginTime) {
+ // eager check for closed to avoid some unnecessary work
+ if (closed) {
+ return;
+ }
+ FiniteDuration timeout = new FiniteDuration(timeoutMillis,
TimeUnit.MILLISECONDS);
+ Future<Payload> heartbeatResponse = triggerHeartbeat(leaderID,
timeout);
+
+ heartbeatResponse.onSuccess(new OnSuccess<Payload>() {
+
+ @Override
+ public void onSuccess(Payload result) throws Throwable {
+ if (!isClosed()) {
+ // report heartbeat response payload
back to sender
+ reportHeartbeatPayload(result);
+
scheduleNewHeartbeatToTaskManagerLater(heartbeatTimeout, heartbeatInterval);
+ }
+ }
+ }, rpcService.getExecutionContext());
+
+ // upon failure, retry
+ heartbeatResponse.onFailure(new OnFailure() {
+ @Override
+ public void onFailure(Throwable failure) {
+ if (!isClosed()) {
+ long currentTime =
System.currentTimeMillis();
+ if(currentTime -
currentHeartbeatBeginTime >= maxAttemptTime) {
+ log.error("Lost heartbeat with
at {} ({}) after {} attempts", targetName, targetAddress, attempt);
+ closed = true;
+ // mark target as failed after
heartbeat interaction attempts failed for max attempt time
+ lossHeartbeat();
+ } else {
+ // we simply have not received
a heartbeat response in time. maybe the timeout was
+ // very low (initial fast
registration attempts), maybe the target endpoint is
+ // currently down.
+ if (failure instanceof
TimeoutException) {
+ if
(log.isDebugEnabled()) {
+
log.debug("Heartbeat to {} ({}) attempt {} timed out after {} ms",
+
targetName, targetAddress, attempt, timeoutMillis);
+ }
+
+ long retryTimeoutMillis
= Math.min(2 * timeoutMillis, maxHeartbeatTimeout);
+
sendHeartbeatToTaskManager(attempt + 1, retryTimeoutMillis,
currentHeartbeatBeginTime);
+ } else {
+ // a serious failure
occurred. we still should not give up, but keep trying
+ log.error("Heartbeat to
" + targetName + " failed due to an error", failure);
+ log.info("Pausing and
re-attempting registration in {} ms", delayOnError);
+
+
sendHeartbeatToTaskManagerLater(attempt + 1, timeoutMillis, delayOnError,
currentHeartbeatBeginTime);
+ }
+ }
+ }
+ }
+ }, rpcService.getExecutionContext());
+ }
+
+ /**
+ * schedule a new heartbeat after delayMills
+ * @param timeoutMillis
+ * @param delayMills
+ */
+ private void scheduleNewHeartbeatToTaskManagerLater(final long
timeoutMillis, final long delayMills) {
--- End diff --
typo: `delayMillis`
> Heartbeat Manager between ResourceManager and TaskExecutor
> ----------------------------------------------------------
>
> Key: FLINK-4449
> URL: https://issues.apache.org/jira/browse/FLINK-4449
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: zhangjing
> Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to
> TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is
> not reported in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)