This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 81c739ae462412e531216bb46bc567fce2355dd8 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Thu Jul 7 13:36:27 2022 +0800 [FLINK-28137][runtime] Introduce SpeculativeScheduler This closes #20222. --- .../runtime/blocklist/BlocklistOperations.java | 33 ++ .../RestartPipelinedRegionFailoverStrategy.java | 12 - .../apache/flink/runtime/jobgraph/JobVertex.java | 7 +- .../DefaultSlotPoolServiceSchedulerFactory.java | 7 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 3 +- .../jobmaster/SlotPoolServiceSchedulerFactory.java | 4 +- .../flink/runtime/scheduler/DefaultScheduler.java | 137 ++++--- .../runtime/scheduler/DefaultSchedulerFactory.java | 4 +- .../flink/runtime/scheduler/ExecutionDeployer.java | 2 +- .../scheduler/ExecutionVertexVersioner.java | 9 + .../flink/runtime/scheduler/SchedulerBase.java | 50 +-- .../runtime/scheduler/SchedulerNGFactory.java | 4 +- .../scheduler/SimpleExecutionSlotAllocator.java | 189 +++++++++ .../adaptive/AdaptiveSchedulerFactory.java | 4 +- .../adaptivebatch/AdaptiveBatchScheduler.java | 14 +- .../AdaptiveBatchSchedulerFactory.java | 139 ++++--- .../adaptivebatch/SpeculativeScheduler.java | 314 +++++++++++++++ .../runtime/jobmaster/JobMasterSchedulerTest.java | 4 +- .../runtime/scheduler/DefaultSchedulerBuilder.java | 47 ++- .../runtime/scheduler/DefaultSchedulerTest.java | 4 +- .../SimpleExecutionSlotAllocatorTest.java | 269 +++++++++++++ .../scheduler/TestingSchedulerNGFactory.java | 4 +- .../adaptivebatch/SpeculativeSchedulerTest.java | 426 +++++++++++++++++++++ 23 files changed, 1516 insertions(+), 170 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java new file mode 100644 index 00000000000..f7ef00f5f01 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blocklist/BlocklistOperations.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blocklist; + +import java.util.Collection; + +/** Operations to perform on the blocklist. */ +public interface BlocklistOperations { + + /** + * Add new blocked node records. If a node (identified by node id) already exists, the newly + * added one will be merged with the existing one. + * + * @param newNodes the new blocked node records + */ + void addNewBlockedNodes(Collection<BlockedNode> newNodes); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java index 4e977c1a7c8..6e7181ffd03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionFailoverStrategy.java @@ -31,9 +31,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.IterableUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -53,10 +50,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy { - /** The log object used for debugging. */ - private static final Logger LOG = - LoggerFactory.getLogger(RestartPipelinedRegionFailoverStrategy.class); - /** The topology containing info about all the vertices and result partitions. */ private final SchedulingTopology topology; @@ -112,7 +105,6 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy @Override public Set<ExecutionVertexID> getTasksNeedingRestart( ExecutionVertexID executionVertexId, Throwable cause) { - LOG.info("Calculating tasks to restart to recover the failed task {}.", executionVertexId); final SchedulingPipelinedRegion failedRegion = topology.getPipelinedRegionOfVertex(executionVertexId); @@ -149,10 +141,6 @@ public class RestartPipelinedRegionFailoverStrategy implements FailoverStrategy dataConsumptionException.get().getPartitionId().getPartitionId()); } - LOG.info( - "{} tasks should be restarted to recover the failed task {}. ", - tasksToRestart.size(), - executionVertexId); return tasksToRestart; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index af4a0189f06..36d1a1e5487 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -308,8 +308,11 @@ public class JobVertex implements java.io.Serializable { * @param parallelism The parallelism for the task. */ public void setParallelism(int parallelism) { - if (parallelism < 1) { - throw new IllegalArgumentException("The parallelism must be at least one."); + if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) { + throw new IllegalArgumentException( + "The parallelism must be at least one, or " + + ExecutionConfig.PARALLELISM_DEFAULT + + " (unset)."); } this.parallelism = parallelism; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java index 0b3757a761e..f218927ebb5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; @@ -112,7 +113,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) throws Exception { return schedulerNGFactory.createInstance( log, @@ -133,7 +135,8 @@ public final class DefaultSlotPoolServiceSchedulerFactory initializationTimestamp, mainThreadExecutor, fatalErrorHandler, - jobStatusListener); + jobStatusListener, + blocklistOperations); } public static DefaultSlotPoolServiceSchedulerFactory create( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1d9024f50ab..25e2949182c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -385,7 +385,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> initializationTimestamp, getMainThreadExecutor(), fatalErrorHandler, - jobStatusListener); + jobStatusListener, + blocklistHandler::addNewBlockedNodes); return scheduler; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java index d97dc577b41..df8c40c86c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotPoolServiceSchedulerFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; @@ -85,6 +86,7 @@ public interface SlotPoolServiceSchedulerFactory { long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 521dda8b953..da6fa69b325 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -29,11 +29,8 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; -import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler; import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; @@ -45,7 +42,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; @@ -53,8 +49,6 @@ import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.topology.Vertex; -import org.apache.flink.util.IterableUtils; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -76,7 +70,9 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** The future default scheduler. */ public class DefaultScheduler extends SchedulerBase implements SchedulerOperations { @@ -85,13 +81,13 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio private final ClassLoader userCodeLoader; - private final ExecutionSlotAllocator executionSlotAllocator; + protected final ExecutionSlotAllocator executionSlotAllocator; private final ExecutionFailureHandler executionFailureHandler; private final ScheduledExecutor delayExecutor; - protected final SchedulingStrategy schedulingStrategy; + private final SchedulingStrategy schedulingStrategy; private final ExecutionOperations executionOperations; @@ -106,7 +102,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio // anymore. The reserved allocation information is needed for local recovery. private final Map<ExecutionVertexID, AllocationID> reservedAllocationByExecutionVertex; - private final ExecutionDeployer executionDeployer; + protected final ExecutionDeployer executionDeployer; protected DefaultScheduler( final Logger log, @@ -204,10 +200,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio @Override protected void cancelAllPendingSlotRequestsInternal() { - IterableUtils.toStream(getSchedulingTopology().getVertices()) - .map(Vertex::getId) - .map(this::getCurrentExecutionIdOfVertex) - .forEach(executionSlotAllocator::cancel); + getSchedulingTopology() + .getVertices() + .forEach(ev -> cancelAllPendingSlotRequestsForVertex(ev.getId())); } @Override @@ -220,47 +215,46 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio } @Override - protected void updateTaskExecutionStateInternal( - final ExecutionVertexID executionVertexId, - final TaskExecutionStateTransition taskExecutionState) { + protected void onTaskFinished(final Execution execution) { + checkState(execution.getState() == ExecutionState.FINISHED); + final ExecutionVertexID executionVertexId = execution.getVertex().getID(); // once a task finishes, it will release the assigned allocation/slot and no longer // needs it. Therefore, it should stop reserving the slot so that other tasks are // possible to use the slot. Ideally, the `stopReserveAllocation` should happen // along with the release slot process. However, that process is hidden in the depth // of the ExecutionGraph, so we currently do it in DefaultScheduler after that process // is done. - if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED) { - stopReserveAllocation(executionVertexId); - } + stopReserveAllocation(executionVertexId); - schedulingStrategy.onExecutionStateChange( - executionVertexId, taskExecutionState.getExecutionState()); - maybeHandleTaskFailure(taskExecutionState, getCurrentExecutionOfVertex(executionVertexId)); + schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED); } - private void maybeHandleTaskFailure( - final TaskExecutionStateTransition taskExecutionState, final Execution execution) { + @Override + protected void onTaskFailed(final Execution execution) { + checkState(execution.getState() == ExecutionState.FAILED); + checkState(execution.getFailureInfo().isPresent()); + + final Throwable error = + execution.getFailureInfo().get().getException().deserializeError(userCodeLoader); + handleTaskFailure( + execution, + maybeTranslateToCachedIntermediateDataSetException( + error, execution.getVertex().getID())); + } - if (taskExecutionState.getExecutionState() == ExecutionState.FAILED) { - final Throwable error = taskExecutionState.getError(userCodeLoader); - handleTaskFailure(execution, error); - } + protected void handleTaskFailure( + final Execution failedExecution, @Nullable final Throwable error) { + maybeRestartTasks(recordTaskFailure(failedExecution, error)); } - private void handleTaskFailure( + protected FailureHandlingResult recordTaskFailure( final Execution failedExecution, @Nullable final Throwable error) { - Throwable revisedError = - maybeTranslateToCachedIntermediateDataSetException( - error, failedExecution.getVertex().getID()); final long timestamp = System.currentTimeMillis(); - setGlobalFailureCause(revisedError, timestamp); - notifyCoordinatorsAboutTaskFailure(failedExecution.getVertex().getID(), revisedError); + setGlobalFailureCause(error, timestamp); + notifyCoordinatorsAboutTaskFailure(failedExecution, error); - final FailureHandlingResult failureHandlingResult = - executionFailureHandler.getFailureHandlingResult( - failedExecution, revisedError, timestamp); - maybeRestartTasks(failureHandlingResult); + return executionFailureHandler.getFailureHandlingResult(failedExecution, error, timestamp); } private Throwable maybeTranslateToCachedIntermediateDataSetException( @@ -286,10 +280,9 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio } private void notifyCoordinatorsAboutTaskFailure( - final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { - final ExecutionJobVertex jobVertex = - getExecutionJobVertex(executionVertexId.getJobVertexId()); - final int subtaskIndex = executionVertexId.getSubtaskIndex(); + final Execution execution, @Nullable final Throwable error) { + final ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex(); + final int subtaskIndex = execution.getParallelSubtaskIndex(); jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex, error)); } @@ -324,13 +317,24 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio .values()); final boolean globalRecovery = failureHandlingResult.isGlobalFailure(); + if (globalRecovery) { + log.info( + "{} tasks will be restarted to recover from a global failure.", + verticesToRestart.size()); + } else { + checkArgument(failureHandlingResult.getFailedExecution().isPresent()); + log.info( + "{} tasks will be restarted to recover the failed task {}.", + verticesToRestart.size(), + failureHandlingResult.getFailedExecution().get().getAttemptId()); + } + addVerticesToRestartPending(verticesToRestart); final CompletableFuture<?> cancelFuture = cancelTasksAsync(verticesToRestart); final FailureHandlingResultSnapshot failureHandlingResultSnapshot = - FailureHandlingResultSnapshot.create( - failureHandlingResult, id -> getExecutionVertex(id).getCurrentExecutions()); + createFailureHandlingResultSnapshot(failureHandlingResult); delayExecutor.schedule( () -> FutureUtils.assertNoException( @@ -345,6 +349,12 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio TimeUnit.MILLISECONDS); } + protected FailureHandlingResultSnapshot createFailureHandlingResultSnapshot( + final FailureHandlingResult failureHandlingResult) { + return FailureHandlingResultSnapshot.create( + failureHandlingResult, id -> getExecutionVertex(id).getCurrentExecutions()); + } + private void addVerticesToRestartPending(final Set<ExecutionVertexID> verticesToRestart) { verticesWaitingForRestart.addAll(verticesToRestart); transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING); @@ -380,9 +390,7 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio private CompletableFuture<?> cancelTasksAsync(final Set<ExecutionVertexID> verticesToRestart) { // clean up all the related pending requests to avoid that immediately returned slot // is used to fulfill the pending requests of these tasks - verticesToRestart.stream() - .map(this::getCurrentExecutionIdOfVertex) - .forEach(executionSlotAllocator::cancel); + cancelAllPendingSlotRequestsForVertices(verticesToRestart); final List<CompletableFuture<?>> cancelFutures = verticesToRestart.stream() @@ -393,15 +401,27 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio } private CompletableFuture<?> cancelExecutionVertex(final ExecutionVertexID executionVertexId) { - final ExecutionVertex vertex = getExecutionVertex(executionVertexId); + return FutureUtils.combineAll( + getExecutionVertex(executionVertexId).getCurrentExecutions().stream() + .map(this::cancelExecution) + .collect(Collectors.toList())); + } - notifyCoordinatorOfCancellation(vertex); + protected CompletableFuture<?> cancelExecution(final Execution execution) { + notifyCoordinatorOfCancellation(execution); + return executionOperations.cancel(execution); + } - return executionOperations.cancel(vertex.getCurrentExecutionAttempt()); + private void cancelAllPendingSlotRequestsForVertices( + final Set<ExecutionVertexID> executionVertices) { + executionVertices.forEach(this::cancelAllPendingSlotRequestsForVertex); } - private ExecutionAttemptID getCurrentExecutionIdOfVertex(ExecutionVertexID executionVertexId) { - return getCurrentExecutionOfVertex(executionVertexId).getAttemptId(); + protected void cancelAllPendingSlotRequestsForVertex( + final ExecutionVertexID executionVertexId) { + getExecutionVertex(executionVertexId) + .getCurrentExecutions() + .forEach(e -> executionSlotAllocator.cancel(e.getAttemptId())); } private Execution getCurrentExecutionOfVertex(ExecutionVertexID executionVertexId) { @@ -445,23 +465,18 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio } } - private void notifyCoordinatorOfCancellation(ExecutionVertex vertex) { + private void notifyCoordinatorOfCancellation(Execution execution) { // this method makes a best effort to filter out duplicate notifications, meaning cases - // where - // the coordinator was already notified for that specific task + // where the coordinator was already notified for that specific task // we don't notify if the task is already FAILED, CANCELLING, or CANCELED - - final ExecutionState currentState = vertex.getExecutionState(); + final ExecutionState currentState = execution.getState(); if (currentState == ExecutionState.FAILED || currentState == ExecutionState.CANCELING || currentState == ExecutionState.CANCELED) { return; } - for (OperatorCoordinatorHolder coordinator : - vertex.getJobVertex().getOperatorCoordinators()) { - coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null); - } + notifyCoordinatorsAboutTaskFailure(execution, null); } private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index de7d56315bd..72893347982 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -72,7 +73,8 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { long initializationTimestamp, final ComponentMainThreadExecutor mainThreadExecutor, final FatalErrorHandler fatalErrorHandler, - final JobStatusListener jobStatusListener) + final JobStatusListener jobStatusListener, + final BlocklistOperations blocklistOperations) throws Exception { final SlotPool slotPool = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java index adb5ff76c87..a42eb97b644 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionDeployer.java @@ -32,7 +32,7 @@ import java.util.Map; import java.util.function.BiConsumer; /** This deployer is responsible for deploying executions. */ -interface ExecutionDeployer { +public interface ExecutionDeployer { /** * Allocate slots and deploy executions. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java index 42e5f3387c2..b0a0b17db0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java @@ -83,6 +83,15 @@ public class ExecutionVertexVersioner { .collect(Collectors.toSet()); } + public Map<ExecutionVertexID, ExecutionVertexVersion> getExecutionVertexVersions( + Collection<ExecutionVertexID> executionVertexIds) { + return executionVertexIds.stream() + .map(id -> new ExecutionVertexVersion(id, getCurrentVersion(id))) + .collect( + Collectors.toMap( + ExecutionVertexVersion::getExecutionVertexId, Function.identity())); + } + ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID executionVertexId) { final long currentVersion = getCurrentVersion(executionVertexId); return new ExecutionVertexVersion(executionVertexId, currentVersion); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 53547ab4104..f240dce094c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -706,49 +706,41 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling @Override public final boolean updateTaskExecutionState( final TaskExecutionStateTransition taskExecutionState) { - final ExecutionVertexID executionVertexId = - taskExecutionState.getID().getExecutionVertexId(); - boolean updateSuccess = executionGraph.updateState(taskExecutionState); - - if (updateSuccess) { - if (isNotifiable(executionVertexId, taskExecutionState)) { - updateTaskExecutionStateInternal(executionVertexId, taskExecutionState); - } + final ExecutionAttemptID attemptId = taskExecutionState.getID(); + final Execution execution = executionGraph.getRegisteredExecutions().get(attemptId); + if (execution != null && executionGraph.updateState(taskExecutionState)) { + onTaskExecutionStateUpdate(execution, taskExecutionState); return true; - } else { - return false; } + + return false; } - private boolean isNotifiable( - final ExecutionVertexID executionVertexId, - final TaskExecutionStateTransition taskExecutionState) { + private void onTaskExecutionStateUpdate( + final Execution execution, final TaskExecutionStateTransition taskExecutionState) { - final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId); + // only notifies a state update if it's effective, namely it successfully + // turns the execution state to the expected value. + if (execution.getState() != taskExecutionState.getExecutionState()) { + return; + } // only notifies FINISHED and FAILED states which are needed at the moment. - // can be refined in FLINK-14233 after the legacy scheduler is removed and - // the actions are factored out from ExecutionGraph. + // can be refined in FLINK-14233 after the actions are factored out from ExecutionGraph. switch (taskExecutionState.getExecutionState()) { case FINISHED: - case FAILED: - // only notifies a state update if it's effective, namely it successfully - // turns the execution state to the expected value. - if (executionVertex.getExecutionState() == taskExecutionState.getExecutionState()) { - return true; - } + onTaskFinished(execution); break; - default: + case FAILED: + onTaskFailed(execution); break; } - - return false; } - protected void updateTaskExecutionStateInternal( - final ExecutionVertexID executionVertexId, - final TaskExecutionStateTransition taskExecutionState) {} + protected abstract void onTaskFinished(final Execution execution); + + protected abstract void onTaskFailed(final Execution execution); @Override public SerializedInputSplit requestNextInputSplit( @@ -770,7 +762,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling } @VisibleForTesting - Iterable<RootExceptionHistoryEntry> getExceptionHistory() { + public Iterable<RootExceptionHistoryEntry> getExceptionHistory() { return exceptionHistory.toArrayList(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java index 27b92b3d8a8..d3809b6b353 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; @@ -61,7 +62,8 @@ public interface SchedulerNGFactory { long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) throws Exception; JobManagerOptions.SchedulerType getSchedulerType(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java new file mode 100644 index 00000000000..e5d6d8ad1e7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; +import org.apache.flink.runtime.util.DualKeyLinkedMap; +import org.apache.flink.util.FlinkException; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A simple implementation of {@link ExecutionSlotAllocator}. No support for slot sharing, + * co-location, state/input locality, nor local recovery. + */ +public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator { + private final PhysicalSlotProvider slotProvider; + + private final boolean slotWillBeOccupiedIndefinitely; + + private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever; + + private final DualKeyLinkedMap< + ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>> + requestedPhysicalSlots; + + SimpleExecutionSlotAllocator( + PhysicalSlotProvider slotProvider, + Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever, + boolean slotWillBeOccupiedIndefinitely) { + this.slotProvider = checkNotNull(slotProvider); + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.resourceProfileRetriever = checkNotNull(resourceProfileRetriever); + this.requestedPhysicalSlots = new DualKeyLinkedMap<>(); + } + + @Override + public List<ExecutionSlotAssignment> allocateSlotsFor( + List<ExecutionAttemptID> executionAttemptIds) { + return executionAttemptIds.stream() + .map(id -> new ExecutionSlotAssignment(id, allocateSlotFor(id))) + .collect(Collectors.toList()); + } + + private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptId) { + if (requestedPhysicalSlots.containsKeyA(executionAttemptId)) { + return requestedPhysicalSlots.getValueByKeyA(executionAttemptId); + } + final SlotRequestId slotRequestId = new SlotRequestId(); + final ResourceProfile resourceProfile = resourceProfileRetriever.apply(executionAttemptId); + final SlotProfile slotProfile = + SlotProfile.priorAllocation( + resourceProfile, + resourceProfile, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptySet()); + final PhysicalSlotRequest request = + new PhysicalSlotRequest(slotRequestId, slotProfile, slotWillBeOccupiedIndefinitely); + final CompletableFuture<LogicalSlot> slotFuture = + slotProvider + .allocatePhysicalSlot(request) + .thenApply( + physicalSlotRequest -> + allocateLogicalSlotFromPhysicalSlot( + slotRequestId, + physicalSlotRequest.getPhysicalSlot(), + slotWillBeOccupiedIndefinitely)); + slotFuture.exceptionally( + throwable -> { + this.requestedPhysicalSlots.removeKeyA(executionAttemptId); + this.slotProvider.cancelSlotRequest(slotRequestId, throwable); + return null; + }); + this.requestedPhysicalSlots.put(executionAttemptId, slotRequestId, slotFuture); + return slotFuture; + } + + @Override + public void cancel(ExecutionAttemptID executionAttemptId) { + final CompletableFuture<LogicalSlot> slotFuture = + this.requestedPhysicalSlots.getValueByKeyA(executionAttemptId); + if (slotFuture != null) { + slotFuture.cancel(false); + } + } + + private void returnLogicalSlot(LogicalSlot slot) { + releaseSlot( + slot, + new FlinkException("Slot is being returned from SimpleExecutionSlotAllocator.")); + } + + private void releaseSlot(LogicalSlot slot, Throwable cause) { + requestedPhysicalSlots.removeKeyB(slot.getSlotRequestId()); + slotProvider.cancelSlotRequest(slot.getSlotRequestId(), cause); + } + + private LogicalSlot allocateLogicalSlotFromPhysicalSlot( + final SlotRequestId slotRequestId, + final PhysicalSlot physicalSlot, + final boolean slotWillBeOccupiedIndefinitely) { + + final SingleLogicalSlot singleLogicalSlot = + new SingleLogicalSlot( + slotRequestId, + physicalSlot, + Locality.UNKNOWN, + this::returnLogicalSlot, + slotWillBeOccupiedIndefinitely); + + final LogicalSlotHolder logicalSlotHolder = new LogicalSlotHolder(singleLogicalSlot); + if (physicalSlot.tryAssignPayload(logicalSlotHolder)) { + return singleLogicalSlot; + } else { + throw new IllegalStateException( + "BUG: Unexpected physical slot payload assignment failure!"); + } + } + + private class LogicalSlotHolder implements PhysicalSlot.Payload { + private final SingleLogicalSlot logicalSlot; + + private LogicalSlotHolder(SingleLogicalSlot logicalSlot) { + this.logicalSlot = checkNotNull(logicalSlot); + } + + @Override + public void release(Throwable cause) { + logicalSlot.release(cause); + releaseSlot(logicalSlot, new FlinkException("Physical slot releases its payload.")); + } + + @Override + public boolean willOccupySlotIndefinitely() { + return logicalSlot.willOccupySlotIndefinitely(); + } + } + + /** Factory to instantiate a {@link SimpleExecutionSlotAllocator}. */ + public static class Factory implements ExecutionSlotAllocatorFactory { + private final PhysicalSlotProvider slotProvider; + + private final boolean slotWillBeOccupiedIndefinitely; + + public Factory(PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) { + this.slotProvider = slotProvider; + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + } + + @Override + public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext context) { + return new SimpleExecutionSlotAllocator( + slotProvider, + id -> context.getResourceProfile(id.getExecutionVertexId()), + slotWillBeOccupiedIndefinitely); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java index 61236a1c18e..2d59d56466d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -80,7 +81,8 @@ public class AdaptiveSchedulerFactory implements SchedulerNGFactory { long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) throws Exception { final DeclarativeSlotPool declarativeSlotPool = slotPoolService diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index f744d68f28b..29d6bfa72a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -26,10 +26,10 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.IntermediateResult; import org.apache.flink.runtime.executiongraph.JobStatusListener; -import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; import org.apache.flink.runtime.jobgraph.JobEdge; @@ -51,7 +51,6 @@ import org.apache.flink.runtime.scheduler.SchedulerOperations; import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup; import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -145,23 +144,20 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule } @Override - public void startSchedulingInternal() { + protected void startSchedulingInternal() { initializeVerticesIfPossible(); super.startSchedulingInternal(); } @Override - protected void updateTaskExecutionStateInternal( - final ExecutionVertexID executionVertexId, - final TaskExecutionStateTransition taskExecutionState) { - + protected void onTaskFinished(final Execution execution) { initializeVerticesIfPossible(); - super.updateTaskExecutionStateInternal(executionVertexId, taskExecutionState); + super.onTaskFinished(execution); } - private void initializeVerticesIfPossible() { + void initializeVerticesIfPossible() { final List<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<>(); try { final long createTimestamp = System.currentTimeMillis(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index 3aaad1dbd0d..621eaa9788b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -25,11 +25,13 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex; import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategyFactoryLoader; @@ -41,8 +43,6 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; -import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker; -import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy; @@ -55,17 +55,19 @@ import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; -import org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator; import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.util.SlotSelectionStrategyUtils; -import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; import org.slf4j.Logger; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Consumer; import static org.apache.flink.util.Preconditions.checkState; @@ -92,7 +94,8 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) throws Exception { checkState( @@ -108,17 +111,15 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { new IllegalStateException( "The AdaptiveBatchScheduler requires a SlotPool.")); - final SlotSelectionStrategy slotSelectionStrategy = - SlotSelectionStrategyUtils.selectSlotSelectionStrategy( - JobType.BATCH, jobMasterConfiguration); - final PhysicalSlotRequestBulkChecker bulkChecker = - PhysicalSlotRequestBulkCheckerImpl.createFromSlotPool( - slotPool, SystemClock.getInstance()); - final PhysicalSlotProvider physicalSlotProvider = - new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool); + final boolean enableSpeculativeExecution = + jobMasterConfiguration.getBoolean(JobManagerOptions.SPECULATIVE_ENABLED); + + final List<Consumer<ComponentMainThreadExecutor>> startUpActions = new ArrayList<>(); + final Consumer<ComponentMainThreadExecutor> combinedStartUpActions = + m -> startUpActions.forEach(a -> a.accept(m)); + final ExecutionSlotAllocatorFactory allocatorFactory = - new SlotSharingExecutionSlotAllocatorFactory( - physicalSlotProvider, false, bulkChecker, slotRequestTimeout); + createExecutionSlotAllocatorFactory(jobMasterConfiguration, slotPool); final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( @@ -147,34 +148,86 @@ public class AdaptiveBatchSchedulerFactory implements SchedulerNGFactory { shuffleMaster, partitionTracker, true, - new ExecutionJobVertex.Factory()); - - return new AdaptiveBatchScheduler( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - bulkChecker::start, - new ScheduledExecutorServiceAdapter(futureExecutor), - userCodeLoader, - new CheckpointsCleaner(), - checkpointRecoveryFactory, - jobManagerJobMetricGroup, - new VertexwiseSchedulingStrategy.Factory(), - FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration), - restartBackoffTimeStrategy, - new DefaultExecutionOperations(), - new ExecutionVertexVersioner(), - allocatorFactory, - initializationTimestamp, - mainThreadExecutor, - jobStatusListener, - executionGraphFactory, - shuffleMaster, - rpcTimeout, - DefaultVertexParallelismDecider.from(jobMasterConfiguration), - DefaultVertexParallelismDecider.getNormalizedMaxParallelism( - jobMasterConfiguration)); + createExecutionJobVertexFactory(enableSpeculativeExecution)); + + if (enableSpeculativeExecution) { + return new SpeculativeScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + combinedStartUpActions, + new ScheduledExecutorServiceAdapter(futureExecutor), + userCodeLoader, + new CheckpointsCleaner(), + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + new VertexwiseSchedulingStrategy.Factory(), + FailoverStrategyFactoryLoader.loadFailoverStrategyFactory( + jobMasterConfiguration), + restartBackoffTimeStrategy, + new DefaultExecutionOperations(), + new ExecutionVertexVersioner(), + allocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + DefaultVertexParallelismDecider.from(jobMasterConfiguration), + DefaultVertexParallelismDecider.getNormalizedMaxParallelism( + jobMasterConfiguration), + blocklistOperations); + } else { + return new AdaptiveBatchScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + combinedStartUpActions, + new ScheduledExecutorServiceAdapter(futureExecutor), + userCodeLoader, + new CheckpointsCleaner(), + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + new VertexwiseSchedulingStrategy.Factory(), + FailoverStrategyFactoryLoader.loadFailoverStrategyFactory( + jobMasterConfiguration), + restartBackoffTimeStrategy, + new DefaultExecutionOperations(), + new ExecutionVertexVersioner(), + allocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + DefaultVertexParallelismDecider.from(jobMasterConfiguration), + DefaultVertexParallelismDecider.getNormalizedMaxParallelism( + jobMasterConfiguration)); + } + } + + private static ExecutionSlotAllocatorFactory createExecutionSlotAllocatorFactory( + Configuration configuration, SlotPool slotPool) { + final SlotSelectionStrategy slotSelectionStrategy = + SlotSelectionStrategyUtils.selectSlotSelectionStrategy( + JobType.BATCH, configuration); + final PhysicalSlotProvider physicalSlotProvider = + new PhysicalSlotProviderImpl(slotSelectionStrategy, slotPool); + + return new SimpleExecutionSlotAllocator.Factory(physicalSlotProvider, false); + } + + private static ExecutionJobVertex.Factory createExecutionJobVertexFactory( + boolean enableSpeculativeExecution) { + if (enableSpeculativeExecution) { + return new SpeculativeExecutionJobVertex.Factory(); + } else { + return new ExecutionJobVertex.Factory(); + } } private void checkAllExchangesBlocking(final JobGraph jobGraph) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java new file mode 100644 index 00000000000..919ec05cf1d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeScheduler.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blocklist.BlockedNode; +import org.apache.flink.runtime.blocklist.BlocklistOperations; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionException; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; +import org.apache.flink.runtime.scheduler.ExecutionOperations; +import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; +import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; +import org.apache.flink.runtime.scheduler.slowtaskdetector.ExecutionTimeBasedSlowTaskDetector; +import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetector; +import org.apache.flink.runtime.scheduler.slowtaskdetector.SlowTaskDetectorListener; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.concurrent.ScheduledExecutor; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** The speculative scheduler. */ +public class SpeculativeScheduler extends AdaptiveBatchScheduler + implements SlowTaskDetectorListener { + + private final int maxConcurrentExecutions; + + private final Duration blockSlowNodeDuration; + + private final BlocklistOperations blocklistOperations; + + private final SlowTaskDetector slowTaskDetector; + + public SpeculativeScheduler( + final Logger log, + final JobGraph jobGraph, + final Executor ioExecutor, + final Configuration jobMasterConfiguration, + final Consumer<ComponentMainThreadExecutor> startUpAction, + final ScheduledExecutor delayExecutor, + final ClassLoader userCodeLoader, + final CheckpointsCleaner checkpointsCleaner, + final CheckpointRecoveryFactory checkpointRecoveryFactory, + final JobManagerJobMetricGroup jobManagerJobMetricGroup, + final SchedulingStrategyFactory schedulingStrategyFactory, + final FailoverStrategy.Factory failoverStrategyFactory, + final RestartBackoffTimeStrategy restartBackoffTimeStrategy, + final ExecutionOperations executionOperations, + final ExecutionVertexVersioner executionVertexVersioner, + final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, + long initializationTimestamp, + final ComponentMainThreadExecutor mainThreadExecutor, + final JobStatusListener jobStatusListener, + final ExecutionGraphFactory executionGraphFactory, + final ShuffleMaster<?> shuffleMaster, + final Time rpcTimeout, + final VertexParallelismDecider vertexParallelismDecider, + final int defaultMaxParallelism, + final BlocklistOperations blocklistOperations) + throws Exception { + + super( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + startUpAction, + delayExecutor, + userCodeLoader, + checkpointsCleaner, + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + schedulingStrategyFactory, + failoverStrategyFactory, + restartBackoffTimeStrategy, + executionOperations, + executionVertexVersioner, + executionSlotAllocatorFactory, + initializationTimestamp, + mainThreadExecutor, + jobStatusListener, + executionGraphFactory, + shuffleMaster, + rpcTimeout, + vertexParallelismDecider, + defaultMaxParallelism); + + this.maxConcurrentExecutions = + jobMasterConfiguration.getInteger( + JobManagerOptions.SPECULATIVE_MAX_CONCURRENT_EXECUTIONS); + + this.blockSlowNodeDuration = + jobMasterConfiguration.get(JobManagerOptions.BLOCK_SLOW_NODE_DURATION); + + this.blocklistOperations = checkNotNull(blocklistOperations); + + this.slowTaskDetector = new ExecutionTimeBasedSlowTaskDetector(jobMasterConfiguration); + } + + @Override + protected void startSchedulingInternal() { + super.startSchedulingInternal(); + slowTaskDetector.start(getExecutionGraph(), this, getMainThreadExecutor()); + } + + @Override + public CompletableFuture<Void> closeAsync() { + slowTaskDetector.stop(); + return super.closeAsync(); + } + + @Override + public SpeculativeExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) { + return (SpeculativeExecutionVertex) super.getExecutionVertex(executionVertexId); + } + + @Override + protected void onTaskFinished(final Execution execution) { + // cancel all un-terminated executions because the execution vertex has finished + FutureUtils.assertNoException(cancelPendingExecutions(execution.getVertex().getID())); + + super.onTaskFinished(execution); + } + + private CompletableFuture<?> cancelPendingExecutions( + final ExecutionVertexID executionVertexId) { + // cancel all the related pending requests to avoid that slots returned by the canceled + // vertices are used to fulfill these pending requests + // do not cancel the FINISHED execution + cancelAllPendingSlotRequestsForVertex(executionVertexId); + return FutureUtils.combineAll( + getExecutionVertex(executionVertexId).getCurrentExecutions().stream() + .filter(e -> e.getState() != ExecutionState.FINISHED) + .map(this::cancelExecution) + .collect(Collectors.toList())); + } + + @Override + protected void onTaskFailed(final Execution execution) { + final SpeculativeExecutionVertex executionVertex = + getExecutionVertex(execution.getVertex().getID()); + + // when an execution fails, remove it from current executions to make room for future + // speculative executions + executionVertex.archiveFailedExecution(execution.getAttemptId()); + + super.onTaskFailed(execution); + } + + @Override + protected void handleTaskFailure( + final Execution failedExecution, @Nullable final Throwable error) { + + final SpeculativeExecutionVertex executionVertex = + getExecutionVertex(failedExecution.getVertex().getID()); + + // if the execution vertex is not possible finish or a PartitionException occurred, trigger + // an execution vertex failover to recover + if (!isExecutionVertexPossibleToFinish(executionVertex) + || ExceptionUtils.findThrowable(error, PartitionException.class).isPresent()) { + super.handleTaskFailure(failedExecution, error); + } else { + // this is just a local failure and the execution vertex will not be fully restarted + handleLocalExecutionAttemptFailure(failedExecution, error); + } + } + + private void handleLocalExecutionAttemptFailure( + final Execution failedExecution, @Nullable final Throwable error) { + executionSlotAllocator.cancel(failedExecution.getAttemptId()); + + final FailureHandlingResult failureHandlingResult = + recordTaskFailure(failedExecution, error); + if (failureHandlingResult.canRestart()) { + archiveFromFailureHandlingResult( + createFailureHandlingResultSnapshot(failureHandlingResult)); + } else { + failJob(error, failureHandlingResult.getTimestamp()); + } + } + + private static boolean isExecutionVertexPossibleToFinish( + final SpeculativeExecutionVertex executionVertex) { + boolean anyExecutionPossibleToFinish = false; + for (Execution execution : executionVertex.getCurrentExecutions()) { + // if any execution has finished, no execution of the same execution vertex should fail + // after that + checkState(execution.getState() != ExecutionState.FINISHED); + + if (execution.getState() == ExecutionState.CREATED + || execution.getState() == ExecutionState.SCHEDULED + || execution.getState() == ExecutionState.DEPLOYING + || execution.getState() == ExecutionState.INITIALIZING + || execution.getState() == ExecutionState.RUNNING) { + anyExecutionPossibleToFinish = true; + } + } + return anyExecutionPossibleToFinish; + } + + @Override + public void notifySlowTasks(Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) { + // add slow nodes to blocklist before scheduling new speculative executions + final long blockedEndTimestamp = + System.currentTimeMillis() + blockSlowNodeDuration.toMillis(); + final Collection<BlockedNode> nodesToBlock = + getSlowNodeIds(slowTasks).stream() + .map( + nodeId -> + new BlockedNode( + nodeId, + "Node is detected to be slow.", + blockedEndTimestamp)) + .collect(Collectors.toList()); + blocklistOperations.addNewBlockedNodes(nodesToBlock); + + final List<Execution> newSpeculativeExecutions = new ArrayList<>(); + final Set<ExecutionVertexID> verticesToDeploy = new HashSet<>(); + for (ExecutionVertexID executionVertexId : slowTasks.keySet()) { + final SpeculativeExecutionVertex executionVertex = + getExecutionVertex(executionVertexId); + + if (executionVertex.containsSources() || executionVertex.containsSinks()) { + continue; + } + + final int currentConcurrentExecutions = executionVertex.getCurrentExecutions().size(); + final int newSpeculativeExecutionsToDeploy = + maxConcurrentExecutions - currentConcurrentExecutions; + if (newSpeculativeExecutionsToDeploy > 0) { + log.info( + "{} ({}) is detected as a slow vertex, create and deploy {} new speculative executions for it.", + executionVertex.getTaskNameWithSubtaskIndex(), + executionVertex.getID(), + newSpeculativeExecutionsToDeploy); + + verticesToDeploy.add(executionVertexId); + IntStream.range(0, newSpeculativeExecutionsToDeploy) + .mapToObj(executionVertex::createNewSpeculativeExecution) + .forEach(newSpeculativeExecutions::add); + } + } + + executionDeployer.allocateSlotsAndDeploy( + newSpeculativeExecutions, + executionVertexVersioner.getExecutionVertexVersions(verticesToDeploy)); + } + + private Set<String> getSlowNodeIds( + Map<ExecutionVertexID, Collection<ExecutionAttemptID>> slowTasks) { + final Set<ExecutionAttemptID> slowExecutions = + slowTasks.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()); + + return slowExecutions.stream() + .map(id -> getExecutionGraph().getRegisteredExecutions().get(id)) + .map(Execution::getAssignedResourceLocation) + .map(TaskManagerLocation::getNodeId) + .collect(Collectors.toSet()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java index 07fc6ed2e3c..ee542eb0822 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; @@ -119,7 +120,8 @@ public class JobMasterSchedulerTest extends TestLogger { long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) { + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) { return TestingSchedulerNG.newBuilder() .setStartSchedulingRunnable( () -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java index 4c99e8ce5ec..7629e76a157 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java @@ -23,12 +23,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex; import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; @@ -40,6 +42,7 @@ import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler; +import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeScheduler; import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider; import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; @@ -57,7 +60,7 @@ import java.util.concurrent.ScheduledExecutorService; import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore; -/** A builder to create {@link DefaultScheduler} instances for testing. */ +/** A builder to create {@link DefaultScheduler} or its subclass instances for testing. */ public class DefaultSchedulerBuilder { private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulerBuilder.class); @@ -95,6 +98,7 @@ public class DefaultSchedulerBuilder { private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0; private int defaultMaxParallelism = JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue(); + private BlocklistOperations blocklistOperations = ignore -> {}; public DefaultSchedulerBuilder( JobGraph jobGraph, @@ -245,6 +249,11 @@ public class DefaultSchedulerBuilder { return this; } + public DefaultSchedulerBuilder setBlocklistOperations(BlocklistOperations blocklistOperations) { + this.blocklistOperations = blocklistOperations; + return this; + } + public DefaultScheduler build() throws Exception { return new DefaultScheduler( log, @@ -301,7 +310,41 @@ public class DefaultSchedulerBuilder { defaultMaxParallelism); } + public SpeculativeScheduler buildSpeculativeScheduler() throws Exception { + return new SpeculativeScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + componentMainThreadExecutor -> {}, + delayExecutor, + userCodeLoader, + checkpointCleaner, + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + new VertexwiseSchedulingStrategy.Factory(), + failoverStrategyFactory, + restartBackoffTimeStrategy, + executionOperations, + executionVertexVersioner, + executionSlotAllocatorFactory, + System.currentTimeMillis(), + mainThreadExecutor, + jobStatusListener, + createExecutionGraphFactory(true, new SpeculativeExecutionJobVertex.Factory()), + shuffleMaster, + rpcTimeout, + vertexParallelismDecider, + defaultMaxParallelism, + blocklistOperations); + } + private ExecutionGraphFactory createExecutionGraphFactory(boolean isDynamicGraph) { + return createExecutionGraphFactory(isDynamicGraph, new ExecutionJobVertex.Factory()); + } + + private ExecutionGraphFactory createExecutionGraphFactory( + boolean isDynamicGraph, ExecutionJobVertex.Factory executionJobVertexFactory) { return new DefaultExecutionGraphFactory( jobMasterConfiguration, userCodeLoader, @@ -314,6 +357,6 @@ public class DefaultSchedulerBuilder { shuffleMaster, partitionTracker, isDynamicGraph, - new ExecutionJobVertex.Factory()); + executionJobVertexFactory); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index d78610c25aa..d3bad8be708 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1690,7 +1690,7 @@ public class DefaultSchedulerTest extends TestLogger { schedulerClosed.get(); } - private static TaskExecutionState createFailedTaskExecutionState( + public static TaskExecutionState createFailedTaskExecutionState( ExecutionAttemptID executionAttemptID) { return new TaskExecutionState( executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause")); @@ -1745,7 +1745,7 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.getJobTerminationFuture().get(TIMEOUT_MS, TimeUnit.MILLISECONDS); } - private static JobGraph singleNonParallelJobVertexJobGraph() { + public static JobGraph singleNonParallelJobVertexJobGraph() { return singleJobVertexJobGraph(1); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java new file mode 100644 index 00000000000..1fd6ca850f6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocatorTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.TestingPayload; +import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.BiConsumerWithException; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test suits for {@link SimpleExecutionSlotAllocator}. */ +class SimpleExecutionSlotAllocatorTest { + + private static final ResourceProfile RESOURCE_PROFILE = ResourceProfile.fromResources(3, 5); + private static final ExecutionAttemptID EXECUTION_ATTEMPT_ID = createExecutionAttemptId(); + + @Test + void testSlotAllocation() { + final AllocationContext context = new AllocationContext(); + final CompletableFuture<LogicalSlot> slotFuture = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + assertThat(slotFuture).isCompleted(); + assertThat(context.getSlotProvider().getRequests()).hasSize(1); + + final PhysicalSlotRequest slotRequest = + context.getSlotProvider().getRequests().values().iterator().next(); + assertThat(slotRequest.getSlotProfile().getPhysicalSlotResourceProfile()) + .isEqualTo(RESOURCE_PROFILE); + } + + @Test + void testDuplicateAllocationDoesNotRecreateLogicalSlotFuture() throws Exception { + final AllocationContext context = new AllocationContext(); + + final CompletableFuture<LogicalSlot> slotFuture1 = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + final CompletableFuture<LogicalSlot> slotFuture2 = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + + assertThat(slotFuture1.get()).isSameAs(slotFuture2.get()); + } + + @Test + void testFailedPhysicalSlotRequestFailsLogicalSlotFuture() { + final AllocationContext context = + new AllocationContext( + TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(), + false); + final CompletableFuture<LogicalSlot> slotFuture = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + final SlotRequestId slotRequestId = + context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); + + assertThat(slotFuture).isNotDone(); + context.getSlotProvider() + .getResponses() + .get(slotRequestId) + .completeExceptionally(new Throwable()); + assertThat(slotFuture).isCompletedExceptionally(); + + // next allocation allocates new slot + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + assertThat(context.getSlotProvider().getRequests()).hasSize(2); + } + + @Test + void testSlotWillBeOccupiedIndefinitelyFalse() throws Exception { + testSlotWillBeOccupiedIndefinitely(false); + } + + @Test + void testSlotWillBeOccupiedIndefinitelyTrue() throws Exception { + testSlotWillBeOccupiedIndefinitely(true); + } + + private static void testSlotWillBeOccupiedIndefinitely(boolean slotWillBeOccupiedIndefinitely) + throws Exception { + final AllocationContext context = + new AllocationContext( + TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), + slotWillBeOccupiedIndefinitely); + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + + final PhysicalSlotRequest slotRequest = context.getSlotProvider().getFirstRequestOrFail(); + assertThat(slotRequest.willSlotBeOccupiedIndefinitely()) + .isEqualTo(slotWillBeOccupiedIndefinitely); + + final TestingPhysicalSlot physicalSlot = + context.getSlotProvider().getResponses().get(slotRequest.getSlotRequestId()).get(); + assertThat(physicalSlot.getPayload()).isNotNull(); + assertThat(physicalSlot.getPayload().willOccupySlotIndefinitely()) + .isEqualTo(slotWillBeOccupiedIndefinitely); + } + + @Test + void testLogicalSlotReleasingCancelsPhysicalSlotRequest() throws Exception { + testLogicalSlotRequestCancellationOrRelease( + true, true, (context, slotFuture) -> slotFuture.get().releaseSlot(null)); + } + + @Test + void testLogicalSlotCancellationCancelsPhysicalSlotRequest() throws Exception { + testLogicalSlotRequestCancellationOrRelease( + false, + true, + (context, slotFuture) -> { + assertThatThrownBy( + () -> { + context.getAllocator().cancel(EXECUTION_ATTEMPT_ID); + slotFuture.get(); + }) + .as("The logical future must finish with a cancellation exception.") + .isInstanceOf(CancellationException.class); + }); + } + + @Test + void testCompletedLogicalSlotCancellationDoesNotCancelPhysicalSlotRequest() throws Exception { + testLogicalSlotRequestCancellationOrRelease( + true, + false, + (context, slotFuture) -> { + context.getAllocator().cancel(EXECUTION_ATTEMPT_ID); + slotFuture.get(); + }); + } + + private static void testLogicalSlotRequestCancellationOrRelease( + final boolean autoCompletePhysicalSlotFuture, + final boolean expectPhysicalSlotRequestCanceled, + final BiConsumerWithException< + AllocationContext, CompletableFuture<LogicalSlot>, Exception> + cancelOrReleaseAction) + throws Exception { + + final TestingPhysicalSlotProvider physicalSlotProvider; + if (!autoCompletePhysicalSlotFuture) { + physicalSlotProvider = + TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(); + } else { + physicalSlotProvider = TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(); + } + final AllocationContext context = new AllocationContext(physicalSlotProvider, false); + + final CompletableFuture<LogicalSlot> slotFuture1 = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + + cancelOrReleaseAction.accept(context, slotFuture1); + + final SlotRequestId slotRequestId = + context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); + assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId)) + .isEqualTo(expectPhysicalSlotRequestCanceled); + + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + final int expectedNumberOfRequests = expectPhysicalSlotRequestCanceled ? 2 : 1; + assertThat(context.getSlotProvider().getRequests()).hasSize(expectedNumberOfRequests); + } + + @Test + void testPhysicalSlotReleasesLogicalSlots() throws Exception { + final AllocationContext context = new AllocationContext(); + final CompletableFuture<LogicalSlot> slotFuture = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + final TestingPayload payload = new TestingPayload(); + slotFuture.thenAccept(logicalSlot -> logicalSlot.tryAssignPayload(payload)); + final SlotRequestId slotRequestId = + context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); + final TestingPhysicalSlot physicalSlot = + context.getSlotProvider().getFirstResponseOrFail().get(); + + assertThat(payload.getTerminalStateFuture()).isNotDone(); + assertThat(physicalSlot.getPayload()).isNotNull(); + + physicalSlot.getPayload().release(new Throwable()); + assertThat(payload.getTerminalStateFuture()).isDone(); + assertThat(context.getSlotProvider().getCancellations()).containsKey(slotRequestId); + + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + // there should be one more physical slot allocation, as the first allocation should be + // removed after releasing all logical slots + assertThat(context.getSlotProvider().getRequests()).hasSize(2); + } + + @Test + void testFailLogicalSlotIfPhysicalSlotIsFails() { + final AllocationContext context = + new AllocationContext( + TestingPhysicalSlotProvider.createWithFailingPhysicalSlotCreation( + new FlinkException("test failure")), + false); + final CompletableFuture<LogicalSlot> slotFuture = + context.allocateSlotsFor(EXECUTION_ATTEMPT_ID); + + assertThat(slotFuture).isCompletedExceptionally(); + assertThat(context.getSlotProvider().getCancellations().keySet()) + .isEqualTo(context.getSlotProvider().getRequests().keySet()); + } + + private static class AllocationContext { + private final TestingPhysicalSlotProvider slotProvider; + private final boolean slotWillBeOccupiedIndefinitely; + private final SimpleExecutionSlotAllocator allocator; + + public AllocationContext() { + this(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation(), false); + } + + public AllocationContext( + TestingPhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely) { + this.slotProvider = slotProvider; + this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely; + this.allocator = + new SimpleExecutionSlotAllocator( + slotProvider, + executionAttemptId -> RESOURCE_PROFILE, + slotWillBeOccupiedIndefinitely); + } + + private CompletableFuture<LogicalSlot> allocateSlotsFor( + ExecutionAttemptID executionAttemptId) { + return allocator + .allocateSlotsFor(Collections.singletonList(executionAttemptId)) + .get(0) + .getLogicalSlotFuture(); + } + + public TestingPhysicalSlotProvider getSlotProvider() { + return slotProvider; + } + + public boolean isSlotWillBeOccupiedIndefinitely() { + return slotWillBeOccupiedIndefinitely; + } + + public SimpleExecutionSlotAllocator getAllocator() { + return allocator; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java index 40a122e46f5..6233a42a805 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNGFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blocklist.BlocklistOperations; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.executiongraph.JobStatusListener; @@ -69,7 +70,8 @@ public class TestingSchedulerNGFactory implements SchedulerNGFactory { long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, - JobStatusListener jobStatusListener) + JobStatusListener jobStatusListener, + BlocklistOperations blocklistOperations) throws Exception { return schedulerNG; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java new file mode 100644 index 00000000000..75456468447 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/SpeculativeSchedulerTest.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.blocklist.BlockedNode; +import org.apache.flink.runtime.blocklist.BlocklistOperations; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultExecutionOperations; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; +import org.apache.flink.runtime.scheduler.TestExecutionOperationsDecorator; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; + +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.createFailedTaskExecutionState; +import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.singleNonParallelJobVertexJobGraph; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SpeculativeScheduler}. */ +class SpeculativeSchedulerTest { + + @RegisterExtension + private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + private ScheduledExecutorService futureExecutor; + + private ManuallyTriggeredScheduledExecutor taskRestartExecutor; + private TestExecutionOperationsDecorator testExecutionOperations; + private TestBlocklistOperations testBlocklistOperations; + private TestRestartBackoffTimeStrategy restartStrategy; + + @BeforeEach + void setUp() { + futureExecutor = new DirectScheduledExecutorService(); + + taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); + testExecutionOperations = + new TestExecutionOperationsDecorator(new DefaultExecutionOperations()); + testBlocklistOperations = new TestBlocklistOperations(); + restartStrategy = new TestRestartBackoffTimeStrategy(true, 0); + } + + @AfterEach + void tearDown() { + if (futureExecutor != null) { + ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, futureExecutor); + } + } + + @Test + void testStartScheduling() { + createSchedulerAndStartScheduling(); + final List<ExecutionAttemptID> deployedExecutions = + testExecutionOperations.getDeployedExecutions(); + assertThat(deployedExecutions).hasSize(1); + } + + @Test + void testNotifySlowTasks() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(1); + + notifySlowTask(scheduler, attempt1); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2); + assertThat(testBlocklistOperations.getAllBlockedNodeIds()) + .containsExactly(attempt1.getAssignedResourceLocation().getNodeId()); + } + + @Test + void testNotifyDuplicatedSlowTasks() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2); + + // notify the execution as a slow task again + notifySlowTask(scheduler, attempt1); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2); + + // fail attempt2 to make room for a new speculative execution + final Execution attempt2 = getExecution(ev, 1); + scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attempt2.getAttemptId())); + + // notify the execution as a slow task again + notifySlowTask(scheduler, attempt1); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3); + } + + @Test + void testRestartVertexIfAllSpeculativeExecutionFailed() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2); + + final ExecutionAttemptID attemptId1 = attempt1.getAttemptId(); + final ExecutionAttemptID attemptId2 = getExecution(ev, 1).getAttemptId(); + + scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId1)); + scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attemptId2)); + taskRestartExecutor.triggerScheduledTasks(); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3); + } + + @Test + void testNoRestartIfNotAllSpeculativeExecutionFailed() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + scheduler.updateTaskExecutionState(createFailedTaskExecutionState(attempt1.getAttemptId())); + taskRestartExecutor.triggerScheduledTasks(); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(2); + } + + @Test + void testRestartVertexIfPartitionExceptionHappened() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + final Execution attempt2 = getExecution(ev, 1); + scheduler.updateTaskExecutionState( + new TaskExecutionState( + attempt1.getAttemptId(), + ExecutionState.FAILED, + new PartitionNotFoundException(new ResultPartitionID()))); + + assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING); + + completeCancellingForAllVertices(scheduler.getExecutionGraph()); + taskRestartExecutor.triggerScheduledTasks(); + + assertThat(testExecutionOperations.getDeployedExecutions()).hasSize(3); + } + + @Test + void testCancelOtherCurrentExecutionsWhenAnyExecutionFinished() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + final Execution attempt2 = getExecution(ev, 1); + scheduler.updateTaskExecutionState( + new TaskExecutionState(attempt1.getAttemptId(), ExecutionState.FINISHED)); + + assertThat(attempt2.getState()).isEqualTo(ExecutionState.CANCELING); + } + + @Test + void testExceptionHistoryIfPartitionExceptionHappened() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + + // A partition exception can result in a restart of the whole execution vertex. + scheduler.updateTaskExecutionState( + new TaskExecutionState( + attempt1.getAttemptId(), + ExecutionState.FAILED, + new PartitionNotFoundException(new ResultPartitionID()))); + + completeCancellingForAllVertices(scheduler.getExecutionGraph()); + taskRestartExecutor.triggerScheduledTasks(); + + assertThat(scheduler.getExceptionHistory()).hasSize(1); + + final RootExceptionHistoryEntry entry = scheduler.getExceptionHistory().iterator().next(); + // the current execution attempt before the restarting should be attempt2 but the failure + // root exception should be attempt1 + assertThat(entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt()); + } + + @Test + void testLocalExecutionAttemptFailureIsCorrectlyRecorded() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + + // the execution vertex will not be restarted if we only fails attempt1, but it still should + // be recorded in the execution graph and in exception history + final TaskExecutionState failedState = + createFailedTaskExecutionState(attempt1.getAttemptId()); + scheduler.updateTaskExecutionState(failedState); + + final ClassLoader classLoader = SpeculativeSchedulerTest.class.getClassLoader(); + assertThat(scheduler.getExecutionGraph().getFailureInfo()).isNotNull(); + assertThat(scheduler.getExecutionGraph().getFailureInfo().getExceptionAsString()) + .contains(failedState.getError(classLoader).getMessage()); + + assertThat(scheduler.getExceptionHistory()).hasSize(1); + + final RootExceptionHistoryEntry entry = scheduler.getExceptionHistory().iterator().next(); + assertThat(entry.getFailingTaskName()).isEqualTo(attempt1.getVertexWithAttempt()); + } + + @Test + void testUnrecoverableLocalExecutionAttemptFailureWillFailJob() { + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + + final TaskExecutionState failedState = + new TaskExecutionState( + attempt1.getAttemptId(), + ExecutionState.FAILED, + new SuppressRestartsException( + new Exception("Forced failure for testing."))); + scheduler.updateTaskExecutionState(failedState); + + assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING); + } + + @Test + void testLocalExecutionAttemptFailureAndForbiddenRestartWillFailJob() { + restartStrategy.setCanRestart(false); + + final SpeculativeScheduler scheduler = createSchedulerAndStartScheduling(); + final ExecutionVertex ev = getOnlyExecutionVertex(scheduler); + final Execution attempt1 = ev.getCurrentExecutionAttempt(); + + notifySlowTask(scheduler, attempt1); + + final TaskExecutionState failedState = + createFailedTaskExecutionState(attempt1.getAttemptId()); + scheduler.updateTaskExecutionState(failedState); + + assertThat(scheduler.getExecutionGraph().getState()).isEqualTo(JobStatus.FAILING); + } + + @Test + public void testSpeculativeExecutionCombinedWithAdaptiveScheduling() throws Exception { + final JobVertex source = createNoOpVertex("source", 1); + final JobVertex sink = createNoOpVertex("sink", -1); + sink.connectNewDataSetAsInput( + source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink); + + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forMainThread(); + final SpeculativeScheduler scheduler = + createSchedulerBuilder(jobGraph, mainThreadExecutor) + .setVertexParallelismDecider(ignore -> 3) + .buildSpeculativeScheduler(); + mainThreadExecutor.execute(scheduler::startScheduling); + + final DefaultExecutionGraph graph = (DefaultExecutionGraph) scheduler.getExecutionGraph(); + final ExecutionJobVertex sourceExecutionJobVertex = graph.getJobVertex(source.getID()); + final ExecutionJobVertex sinkExecutionJobVertex = graph.getJobVertex(sink.getID()); + + final ExecutionVertex sourceExecutionVertex = sourceExecutionJobVertex.getTaskVertices()[0]; + assertThat(sourceExecutionVertex.getCurrentExecutions()).hasSize(1); + + // trigger source vertex speculation + final Execution sourceAttempt1 = sourceExecutionVertex.getCurrentExecutionAttempt(); + notifySlowTask(scheduler, sourceAttempt1); + assertThat(sourceExecutionVertex.getCurrentExecutions()).hasSize(2); + + assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(-1); + + // Finishing any source execution attempt will finish the source execution vertex, and then + // finish the job vertex. + scheduler.updateTaskExecutionState( + new TaskExecutionState( + sourceAttempt1.getAttemptId(), + ExecutionState.FINISHED, + null, + null, + new IOMetrics(0, 0, 0, 0, 0, 0, 0))); + assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(3); + + // trigger sink vertex speculation + final ExecutionVertex sinkExecutionVertex = sinkExecutionJobVertex.getTaskVertices()[0]; + final Execution sinkAttempt1 = sinkExecutionVertex.getCurrentExecutionAttempt(); + notifySlowTask(scheduler, sinkAttempt1); + assertThat(sinkExecutionVertex.getCurrentExecutions()).hasSize(2); + } + + private static Execution getExecution(ExecutionVertex executionVertex, int attemptNumber) { + return executionVertex.getCurrentExecutions().stream() + .filter(e -> e.getAttemptNumber() == attemptNumber) + .findFirst() + .get(); + } + + private static ExecutionVertex getOnlyExecutionVertex(SpeculativeScheduler scheduler) { + return Iterables.getOnlyElement(scheduler.getExecutionGraph().getAllExecutionVertices()); + } + + private SpeculativeScheduler createSchedulerAndStartScheduling() { + return createSchedulerAndStartScheduling(singleNonParallelJobVertexJobGraph()); + } + + private SpeculativeScheduler createSchedulerAndStartScheduling(final JobGraph jobGraph) { + final ComponentMainThreadExecutor mainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forMainThread(); + + try { + final SpeculativeScheduler scheduler = createScheduler(jobGraph, mainThreadExecutor); + mainThreadExecutor.execute(scheduler::startScheduling); + return scheduler; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private SpeculativeScheduler createScheduler( + final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) + throws Exception { + return createSchedulerBuilder(jobGraph, mainThreadExecutor).buildSpeculativeScheduler(); + } + + private DefaultSchedulerBuilder createSchedulerBuilder( + final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) { + return new DefaultSchedulerBuilder( + jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + .setBlocklistOperations(testBlocklistOperations) + .setExecutionOperations(testExecutionOperations) + .setFutureExecutor(futureExecutor) + .setDelayExecutor(taskRestartExecutor) + .setRestartBackoffTimeStrategy(restartStrategy); + } + + private static void notifySlowTask( + final SpeculativeScheduler scheduler, final Execution slowTask) { + scheduler.notifySlowTasks( + ImmutableMap.of( + slowTask.getVertex().getID(), + Collections.singleton(slowTask.getAttemptId()))); + } + + private static class TestBlocklistOperations implements BlocklistOperations { + private final List<BlockedNode> blockedNodes = new ArrayList<>(); + + @Override + public void addNewBlockedNodes(Collection<BlockedNode> newNodes) { + blockedNodes.addAll(newNodes); + } + + public Set<String> getAllBlockedNodeIds() { + return blockedNodes.stream().map(BlockedNode::getNodeId).collect(Collectors.toSet()); + } + } +}