tillrohrmann commented on a change in pull request #15229:
URL: https://github.com/apache/flink/pull/15229#discussion_r741826927
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerComponents.java
##########
@@ -124,9 +126,19 @@ private static SlotSelectionStrategy
selectSlotSelectionStrategy(
?
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut()
:
LocationPreferenceSlotSelectionStrategy.createDefault();
- return configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY)
- ? PreviousAllocationSlotSelectionStrategy.create(
- locationPreferenceSlotSelectionStrategy)
- : locationPreferenceSlotSelectionStrategy;
+ final boolean isLocalRecoveryEnabled =
+ configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY);
+ if (isLocalRecoveryEnabled) {
+ if (jobType == JobType.STREAMING) {
+ return PreviousAllocationSlotSelectionStrategy.create(
+ locationPreferenceSlotSelectionStrategy);
+ } else {
+ throw new IllegalArgumentException(
+ "Local recovery does not support batch jobs. Please
set configuration "
+ + "'state.backend.local-recovery' to false for
batch jobs.");
Review comment:
Do we really want to fail hard if local recovery is enabled on a cluster
and a user submits a batch job? In general I am in favour of making things
explicit. However, in this case it will no longer work that you have a session
cluster where you submit a mixed workload (streaming as well as batch jobs) and
you want to enable local recovery for streaming jobs.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingLocalRecoveryITCase.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+
+/** IT case for testing Flink's scheduling regarding local recovery. */
+public class SchedulingLocalRecoveryITCase extends TestLogger {
Review comment:
Maybe `DefaultSchedulerLocalRecoveryITCase`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -204,6 +212,10 @@ protected void updateTaskExecutionStateInternal(
final ExecutionVertexID executionVertexId,
final TaskExecutionStateTransition taskExecutionState) {
+ if (taskExecutionState.getExecutionState() == ExecutionState.FINISHED)
{
+ stopReserveAllocation(executionVertexId);
+ }
Review comment:
It is a bit unfortunate that the contract between
`stopReserveAllocation` and releasing of a slot is a bit implicit via the
`ExecutionState.FINISHED`. It would be more explicit if we had a
`releaseAssignedResource` call in the `DefaultScheduler` but the release logic
is hidden in the depth of the `ExecutionGraph`. Maybe we can add a comment
explaining this situation.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -40,38 +39,37 @@
private final Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever;
+ private final Supplier<Set<AllocationID>> reservedAllocationIdsRetriever;
+
MergingSharedSlotProfileRetrieverFactory(
SyncPreferredLocationsRetriever preferredLocationsRetriever,
- Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever) {
+ Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever,
+ Supplier<Set<AllocationID>> reservedAllocationIdsRetriever) {
this.preferredLocationsRetriever =
Preconditions.checkNotNull(preferredLocationsRetriever);
this.priorAllocationIdRetriever =
Preconditions.checkNotNull(priorAllocationIdRetriever);
+ this.reservedAllocationIdsRetriever =
+ Preconditions.checkNotNull(reservedAllocationIdsRetriever);
}
@Override
public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID>
bulk) {
- Set<AllocationID> allPriorAllocationIds =
- bulk.stream()
- .map(priorAllocationIdRetriever)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
- return new MergingSharedSlotProfileRetriever(allPriorAllocationIds,
bulk);
+ return new
MergingSharedSlotProfileRetriever(reservedAllocationIdsRetriever.get(), bulk);
Review comment:
On a related note, maybe in the future we need to derive the
`slotWillBeOccupiedIndefinitely` property from the boundedness of the input
streams of an operator.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MergingSharedSlotProfileRetrieverFactory.java
##########
@@ -40,38 +39,37 @@
private final Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever;
+ private final Supplier<Set<AllocationID>> reservedAllocationIdsRetriever;
+
MergingSharedSlotProfileRetrieverFactory(
SyncPreferredLocationsRetriever preferredLocationsRetriever,
- Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever) {
+ Function<ExecutionVertexID, AllocationID>
priorAllocationIdRetriever,
+ Supplier<Set<AllocationID>> reservedAllocationIdsRetriever) {
this.preferredLocationsRetriever =
Preconditions.checkNotNull(preferredLocationsRetriever);
this.priorAllocationIdRetriever =
Preconditions.checkNotNull(priorAllocationIdRetriever);
+ this.reservedAllocationIdsRetriever =
+ Preconditions.checkNotNull(reservedAllocationIdsRetriever);
}
@Override
public SharedSlotProfileRetriever createFromBulk(Set<ExecutionVertexID>
bulk) {
- Set<AllocationID> allPriorAllocationIds =
- bulk.stream()
- .map(priorAllocationIdRetriever)
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
- return new MergingSharedSlotProfileRetriever(allPriorAllocationIds,
bulk);
+ return new
MergingSharedSlotProfileRetriever(reservedAllocationIdsRetriever.get(), bulk);
Review comment:
With this change, I think we will no longer be able to schedule the
following topology: Assume we have two pipelined regions A and B with each two
tasks. Moreover, we are processing bounded streams. So in order to schedule A
and B we need in total 4 slots. Now assume that 2 slots disappear. One used by
A and one used by B. Moreover, assume that we won't get any new slots. In this
case, Flink won't be able to schedule neither A nor B because of the reserved
slots. However, w/o the reserved slots, it should be able to run both regions
one after another.
This is probably more of an academic problem tbh. To be fair if we had only
2 slots in total, then we would have already failed if we hit the slot request
timeout when scheduling the job initially. Ideally, though, we would eventually
give up reserved slots if this allowed us to make progress.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -103,6 +104,10 @@
private final Time rpcTimeout;
+ private final Map<AllocationID, Long> reservedAllocationRefCounters;
Review comment:
Did you introduce this map for performance reasons? Alternatively we
could probably also get the same result via `new
HashSet<>(reservedAllocationsByExecutionVertex.values())`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocationContext.java
##########
@@ -67,4 +68,14 @@
* @return all co-location groups in the job
*/
Set<CoLocationGroup> getCoLocationGroups();
+
+ /**
+ * Returns all allocations to reserve. These allocations/slots were used
to run certain vertices
+ * and reserving them can prevent other vertices to take these slots and
thus help vertices to
+ * be deployed into their previous slots again after failover. It is
needed if {@link
+ * CheckpointingOptions#LOCAL_RECOVERY} is enabled.
+ *
+ * @return all allocations to reserve
+ */
+ Set<AllocationID> getAllocationsToReserve();
Review comment:
Maybe `getReservedAllocations`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -669,6 +704,11 @@ public SchedulingTopology getSchedulingTopology() {
public Optional<TaskManagerLocation>
getStateLocation(ExecutionVertexID executionVertexId) {
return stateLocationRetriever.getStateLocation(executionVertexId);
}
+
+ @Override
+ public Set<AllocationID> getAllocationsToReserve() {
Review comment:
Maybe `getReservedAllocations`?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingLocalRecoveryITCase.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+
+/** IT case for testing Flink's scheduling regarding local recovery. */
+public class SchedulingLocalRecoveryITCase extends TestLogger {
+
+ private static final long TIMEOUT = 10_000L;
+
+ @Test
+ public void testLocalRecoveryFull() throws Exception {
+ testLocalRecoveryInternal("full");
+ }
+
+ @Test
+ public void testLocalRecoveryRegion() throws Exception {
+ testLocalRecoveryInternal("region");
+ }
+
+ private void testLocalRecoveryInternal(String failoverStrategyValue)
throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+ configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(),
failoverStrategyValue);
+
+ final int parallelism = 10;
+ final ArchivedExecutionGraph graph =
executeSchedulingTest(configuration, parallelism);
+ assertNonLocalRecoveredTasksEquals(graph, 1);
+ }
+
+ private void assertNonLocalRecoveredTasksEquals(ArchivedExecutionGraph
graph, int expected) {
+ int nonLocalRecoveredTasks = 0;
+ for (ArchivedExecutionVertex vertex : graph.getAllExecutionVertices())
{
+ int currentAttemptNumber =
vertex.getCurrentExecutionAttempt().getAttemptNumber();
+ if (currentAttemptNumber == 0) {
+ // the task had never restarted and do not need to recover
+ continue;
+ }
+ AllocationID priorAllocation =
+ vertex.getPriorExecutionAttempt(currentAttemptNumber - 1)
+ .getAssignedAllocationID();
+ AllocationID currentAllocation =
+
vertex.getCurrentExecutionAttempt().getAssignedAllocationID();
+
+ assertNotNull(priorAllocation);
+ assertNotNull(currentAllocation);
+ if (!currentAllocation.equals(priorAllocation)) {
+ nonLocalRecoveredTasks++;
+ }
+ }
+ assertThat(nonLocalRecoveredTasks, is(expected));
+ }
+
+ private ArchivedExecutionGraph executeSchedulingTest(
+ Configuration configuration, int parallelism) throws Exception {
+ configuration.setString(RestOptions.BIND_PORT, "0");
+
+ final long slotIdleTimeout = TIMEOUT;
+ configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT,
slotIdleTimeout);
+
+ configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY,
MemorySize.parse("64mb"));
+ configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
MemorySize.parse("16mb"));
+ configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
MemorySize.parse("16mb"));
+
+ final MiniClusterConfiguration miniClusterConfiguration =
+ new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(parallelism)
+ .setNumSlotsPerTaskManager(1)
+ .build();
+
+ try (MiniCluster miniCluster = new
MiniCluster(miniClusterConfiguration)) {
+ miniCluster.start();
+
+ MiniClusterClient miniClusterClient = new
MiniClusterClient(configuration, miniCluster);
+
+ JobGraph jobGraph = createJobGraph(parallelism);
+
+ // wait for the submission to succeed
+ JobID jobId = miniClusterClient.submitJob(jobGraph).get();
+
+ // wait until all tasks running before triggering task failures
+ waitUntilAllVerticesRunning(jobId, miniCluster, TIMEOUT);
+
+ // kill one TM to trigger task failure and remove one existing slot
+ CompletableFuture<Void> terminationFuture =
miniCluster.terminateTaskManager(0);
+ terminationFuture.get();
+
+ // restart a taskmanager as a replacement for the killed one
+ miniCluster.startTaskManager();
+
+ // wait until all tasks running again
+ waitUntilAllVerticesRunning(jobId, miniCluster, TIMEOUT);
+
+ ArchivedExecutionGraph graph =
+
miniCluster.getArchivedExecutionGraph(jobGraph.getJobID()).get();
+
+ miniCluster.cancelJob(jobId).get();
+
+ return graph;
+ }
+ }
+
+ private void waitUntilAllVerticesRunning(
+ JobID jobId, MiniCluster miniCluster, long maxWaitMillis) throws
Exception {
+ final Deadline deadline =
Deadline.fromNow(Duration.ofMillis(maxWaitMillis));
+
+ boolean checkResult = false;
+ do {
+ AccessExecutionGraph graph =
miniCluster.getArchivedExecutionGraph(jobId).get();
+
+ // we need the job status to be RUNNING first to ensure execution
vertices are created
+ if (graph.getState() != JobStatus.RUNNING) {
+ continue;
+ }
+
+ checkResult = areAllVerticesRunning(graph);
+
+ if (!checkResult) {
+ Thread.sleep(2L);
+ }
+ } while (!checkResult && deadline.hasTimeLeft());
+
+ if (!checkResult) {
+ throw new TimeoutException("Not all vertices becomes RUNNING in
time.");
+ }
+ }
Review comment:
Maybe we can use `CommonTestUtils.waitForAllTaskRunning`.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/runtime/SchedulingLocalRecoveryITCase.java
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNotNull;
+
+/** IT case for testing Flink's scheduling regarding local recovery. */
+public class SchedulingLocalRecoveryITCase extends TestLogger {
+
+ private static final long TIMEOUT = 10_000L;
+
+ @Test
+ public void testLocalRecoveryFull() throws Exception {
+ testLocalRecoveryInternal("full");
+ }
+
+ @Test
+ public void testLocalRecoveryRegion() throws Exception {
+ testLocalRecoveryInternal("region");
+ }
+
+ private void testLocalRecoveryInternal(String failoverStrategyValue)
throws Exception {
+ final Configuration configuration = new Configuration();
+ configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
+ configuration.setString(EXECUTION_FAILOVER_STRATEGY.key(),
failoverStrategyValue);
+
+ final int parallelism = 10;
+ final ArchivedExecutionGraph graph =
executeSchedulingTest(configuration, parallelism);
+ assertNonLocalRecoveredTasksEquals(graph, 1);
+ }
+
+ private void assertNonLocalRecoveredTasksEquals(ArchivedExecutionGraph
graph, int expected) {
+ int nonLocalRecoveredTasks = 0;
+ for (ArchivedExecutionVertex vertex : graph.getAllExecutionVertices())
{
+ int currentAttemptNumber =
vertex.getCurrentExecutionAttempt().getAttemptNumber();
+ if (currentAttemptNumber == 0) {
+ // the task had never restarted and do not need to recover
+ continue;
+ }
+ AllocationID priorAllocation =
+ vertex.getPriorExecutionAttempt(currentAttemptNumber - 1)
+ .getAssignedAllocationID();
+ AllocationID currentAllocation =
+
vertex.getCurrentExecutionAttempt().getAssignedAllocationID();
+
+ assertNotNull(priorAllocation);
+ assertNotNull(currentAllocation);
+ if (!currentAllocation.equals(priorAllocation)) {
+ nonLocalRecoveredTasks++;
+ }
+ }
+ assertThat(nonLocalRecoveredTasks, is(expected));
+ }
+
+ private ArchivedExecutionGraph executeSchedulingTest(
+ Configuration configuration, int parallelism) throws Exception {
+ configuration.setString(RestOptions.BIND_PORT, "0");
+
+ final long slotIdleTimeout = TIMEOUT;
+ configuration.setLong(JobManagerOptions.SLOT_IDLE_TIMEOUT,
slotIdleTimeout);
+
+ configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY,
MemorySize.parse("64mb"));
+ configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY,
MemorySize.parse("16mb"));
+ configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY,
MemorySize.parse("16mb"));
+
+ final MiniClusterConfiguration miniClusterConfiguration =
+ new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumTaskManagers(parallelism)
+ .setNumSlotsPerTaskManager(1)
+ .build();
+
+ try (MiniCluster miniCluster = new
MiniCluster(miniClusterConfiguration)) {
+ miniCluster.start();
+
+ MiniClusterClient miniClusterClient = new
MiniClusterClient(configuration, miniCluster);
+
+ JobGraph jobGraph = createJobGraph(parallelism);
+
+ // wait for the submission to succeed
+ JobID jobId = miniClusterClient.submitJob(jobGraph).get();
+
+ // wait until all tasks running before triggering task failures
+ waitUntilAllVerticesRunning(jobId, miniCluster, TIMEOUT);
+
+ // kill one TM to trigger task failure and remove one existing slot
+ CompletableFuture<Void> terminationFuture =
miniCluster.terminateTaskManager(0);
+ terminationFuture.get();
+
+ // restart a taskmanager as a replacement for the killed one
+ miniCluster.startTaskManager();
+
+ // wait until all tasks running again
+ waitUntilAllVerticesRunning(jobId, miniCluster, TIMEOUT);
+
+ ArchivedExecutionGraph graph =
+
miniCluster.getArchivedExecutionGraph(jobGraph.getJobID()).get();
+
+ miniCluster.cancelJob(jobId).get();
+
+ return graph;
+ }
+ }
+
+ private void waitUntilAllVerticesRunning(
+ JobID jobId, MiniCluster miniCluster, long maxWaitMillis) throws
Exception {
+ final Deadline deadline =
Deadline.fromNow(Duration.ofMillis(maxWaitMillis));
+
+ boolean checkResult = false;
+ do {
+ AccessExecutionGraph graph =
miniCluster.getArchivedExecutionGraph(jobId).get();
+
+ // we need the job status to be RUNNING first to ensure execution
vertices are created
+ if (graph.getState() != JobStatus.RUNNING) {
+ continue;
+ }
+
+ checkResult = areAllVerticesRunning(graph);
+
+ if (!checkResult) {
+ Thread.sleep(2L);
+ }
+ } while (!checkResult && deadline.hasTimeLeft());
+
+ if (!checkResult) {
+ throw new TimeoutException("Not all vertices becomes RUNNING in
time.");
+ }
+ }
+
+ private boolean areAllVerticesRunning(AccessExecutionGraph graph) {
+ for (AccessExecutionVertex vertex : graph.getAllExecutionVertices()) {
+ if (vertex.getExecutionState() != ExecutionState.RUNNING) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private JobGraph createJobGraph(int parallelism) throws IOException {
+ final JobVertex source = new JobVertex("v1");
+ source.setInvokableClass(PendingInvokable.class);
+ source.setParallelism(parallelism);
+
+ ExecutionConfig executionConfig = new ExecutionConfig();
+
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 10));
+
+ return JobGraphBuilder.newStreamingJobGraphBuilder()
+ .addJobVertices(Arrays.asList(source))
+ .setExecutionConfig(executionConfig)
+ .build();
+ }
+
+ /** Invokable which does not finish. */
+ public static final class PendingInvokable extends AbstractInvokable {
+ private boolean canceled = false;
+
+ public PendingInvokable(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ while (!canceled) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ canceled = true;
+ }
+ }
Review comment:
This looks quite similar to `MyCancellableInvokable`. Maybe this can
deduplicated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]