This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e2665068b857bec558635aa7eb7943c5bc25d07 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Mon Dec 23 11:26:50 2019 +0100 [FLINK-15320] Simplify DefaultSchedulerTest tests for incrementing vertex versions This closes #10646. --- .../scheduler/ExecutionVertexVersioner.java | 17 +++++++++++++--- .../flink/runtime/scheduler/SchedulerBase.java | 6 ------ .../runtime/scheduler/DefaultSchedulerTest.java | 23 +++++++++++----------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java index 081b6ff..0833e6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java @@ -59,11 +59,16 @@ public class ExecutionVertexVersioner { } public boolean isModified(final ExecutionVertexVersion executionVertexVersion) { - final Long currentVersion = executionVertexToVersion.get(executionVertexVersion.getExecutionVertexId()); + final Long currentVersion = getCurrentVersion(executionVertexVersion.getExecutionVertexId()); + return currentVersion != executionVertexVersion.getVersion(); + } + + private Long getCurrentVersion(ExecutionVertexID executionVertexId) { + final Long currentVersion = executionVertexToVersion.get(executionVertexId); Preconditions.checkState(currentVersion != null, "Execution vertex %s does not have a recorded version", - executionVertexVersion.getExecutionVertexId()); - return currentVersion != executionVertexVersion.getVersion(); + executionVertexId); + return currentVersion; } public Set<ExecutionVertexID> getUnmodifiedExecutionVertices(final Set<ExecutionVertexVersion> executionVertexVersions) { @@ -73,4 +78,10 @@ public class ExecutionVertexVersioner { .collect(Collectors.toSet()); } + ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID executionVertexId) { + final long currentVersion = getCurrentVersion(executionVertexId); + return new ExecutionVertexVersion( + executionVertexId, + currentVersion); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index aaa1a32..e5f7ce9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -414,11 +413,6 @@ public abstract class SchedulerBase implements SchedulerNG { .collect(Collectors.toSet())); } - @VisibleForTesting - ExecutionVertexVersioner getExecutionVertexVersioner() { - return executionVertexVersioner; - } - // ------------------------------------------------------------------------ // SchedulerNG // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index def728e..75ff382 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -64,6 +64,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -556,17 +557,13 @@ public class DefaultSchedulerTest extends TestLogger { final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph); final ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); - // suppress restarts so any task failure will lead to job failure - testRestartBackoffTimeStrategy.setCanRestart(false); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final ExecutionVertexVersion executionVertexVersion = executionVertexVersioner.getExecutionVertexVersion( + onlyExecutionVertexId); - final ArchivedExecutionVertex archivedExecutionVertex = Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices()); - final ExecutionAttemptID attemptId = archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); - - scheduler.updateTaskExecutionState(new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED)); + scheduler.failJob(new FlinkException("Test failure.")); - final ExecutionVertexVersioner executionVertexVersioner = scheduler.getExecutionVertexVersioner(); - assertTrue(executionVertexVersioner.isModified(new ExecutionVertexVersion(onlyExecutionVertexId, 1))); + assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); } @Test @@ -576,11 +573,12 @@ public class DefaultSchedulerTest extends TestLogger { final ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final ExecutionVertexVersion executionVertexVersion = executionVertexVersioner.getExecutionVertexVersion( + onlyExecutionVertexId); scheduler.cancel(); - final ExecutionVertexVersioner executionVertexVersioner = scheduler.getExecutionVertexVersioner(); - assertTrue(executionVertexVersioner.isModified(new ExecutionVertexVersion(onlyExecutionVertexId, 1))); + assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); } @Test @@ -590,11 +588,12 @@ public class DefaultSchedulerTest extends TestLogger { final ExecutionVertexID onlyExecutionVertexId = new ExecutionVertexID(onlyJobVertex.getID(), 0); final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph); + final ExecutionVertexVersion executionVertexVersion = executionVertexVersioner.getExecutionVertexVersion( + onlyExecutionVertexId); scheduler.suspend(new Exception("forced suspend")); - final ExecutionVertexVersioner executionVertexVersioner = scheduler.getExecutionVertexVersioner(); - assertTrue(executionVertexVersioner.isModified(new ExecutionVertexVersion(onlyExecutionVertexId, 1))); + assertTrue(executionVertexVersioner.isModified(executionVertexVersion)); } private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {