This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2e2665068b857bec558635aa7eb7943c5bc25d07
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Mon Dec 23 11:26:50 2019 +0100

    [FLINK-15320] Simplify DefaultSchedulerTest tests for incrementing vertex 
versions
    
    This closes #10646.
---
 .../scheduler/ExecutionVertexVersioner.java        | 17 +++++++++++++---
 .../flink/runtime/scheduler/SchedulerBase.java     |  6 ------
 .../runtime/scheduler/DefaultSchedulerTest.java    | 23 +++++++++++-----------
 3 files changed, 25 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
index 081b6ff..0833e6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -59,11 +59,16 @@ public class ExecutionVertexVersioner {
        }
 
        public boolean isModified(final ExecutionVertexVersion 
executionVertexVersion) {
-               final Long currentVersion = 
executionVertexToVersion.get(executionVertexVersion.getExecutionVertexId());
+               final Long currentVersion = 
getCurrentVersion(executionVertexVersion.getExecutionVertexId());
+               return currentVersion != executionVertexVersion.getVersion();
+       }
+
+       private Long getCurrentVersion(ExecutionVertexID executionVertexId) {
+               final Long currentVersion = 
executionVertexToVersion.get(executionVertexId);
                Preconditions.checkState(currentVersion != null,
                        "Execution vertex %s does not have a recorded version",
-                       executionVertexVersion.getExecutionVertexId());
-               return currentVersion != executionVertexVersion.getVersion();
+                       executionVertexId);
+               return currentVersion;
        }
 
        public Set<ExecutionVertexID> getUnmodifiedExecutionVertices(final 
Set<ExecutionVertexVersion> executionVertexVersions) {
@@ -73,4 +78,10 @@ public class ExecutionVertexVersioner {
                        .collect(Collectors.toSet());
        }
 
+       ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID 
executionVertexId) {
+               final long currentVersion = 
getCurrentVersion(executionVertexId);
+               return new ExecutionVertexVersion(
+                       executionVertexId,
+                       currentVersion);
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index aaa1a32..e5f7ce9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.runtime.scheduler;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -414,11 +413,6 @@ public abstract class SchedulerBase implements SchedulerNG 
{
                                .collect(Collectors.toSet()));
        }
 
-       @VisibleForTesting
-       ExecutionVertexVersioner getExecutionVertexVersioner() {
-               return executionVertexVersioner;
-       }
-
        // 
------------------------------------------------------------------------
        // SchedulerNG
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index def728e..75ff382 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -64,6 +64,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
@@ -556,17 +557,13 @@ public class DefaultSchedulerTest extends TestLogger {
                final JobVertex onlyJobVertex = getOnlyJobVertex(jobGraph);
                final ExecutionVertexID onlyExecutionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
 
-               // suppress restarts so any task failure will lead to job 
failure
-               testRestartBackoffTimeStrategy.setCanRestart(false);
                final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+               final ExecutionVertexVersion executionVertexVersion = 
executionVertexVersioner.getExecutionVertexVersion(
+                       onlyExecutionVertexId);
 
-               final ArchivedExecutionVertex archivedExecutionVertex = 
Iterables.getOnlyElement(scheduler.requestJob().getAllExecutionVertices());
-               final ExecutionAttemptID attemptId = 
archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
-
-               scheduler.updateTaskExecutionState(new 
TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.FAILED));
+               scheduler.failJob(new FlinkException("Test failure."));
 
-               final ExecutionVertexVersioner executionVertexVersioner = 
scheduler.getExecutionVertexVersioner();
-               assertTrue(executionVertexVersioner.isModified(new 
ExecutionVertexVersion(onlyExecutionVertexId, 1)));
+               
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
        }
 
        @Test
@@ -576,11 +573,12 @@ public class DefaultSchedulerTest extends TestLogger {
                final ExecutionVertexID onlyExecutionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
 
                final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+               final ExecutionVertexVersion executionVertexVersion = 
executionVertexVersioner.getExecutionVertexVersion(
+                       onlyExecutionVertexId);
 
                scheduler.cancel();
 
-               final ExecutionVertexVersioner executionVertexVersioner = 
scheduler.getExecutionVertexVersioner();
-               assertTrue(executionVertexVersioner.isModified(new 
ExecutionVertexVersion(onlyExecutionVertexId, 1)));
+               
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
        }
 
        @Test
@@ -590,11 +588,12 @@ public class DefaultSchedulerTest extends TestLogger {
                final ExecutionVertexID onlyExecutionVertexId = new 
ExecutionVertexID(onlyJobVertex.getID(), 0);
 
                final DefaultScheduler scheduler = 
createSchedulerAndStartScheduling(jobGraph);
+               final ExecutionVertexVersion executionVertexVersion = 
executionVertexVersioner.getExecutionVertexVersion(
+                       onlyExecutionVertexId);
 
                scheduler.suspend(new Exception("forced suspend"));
 
-               final ExecutionVertexVersioner executionVertexVersioner = 
scheduler.getExecutionVertexVersioner();
-               assertTrue(executionVertexVersioner.isModified(new 
ExecutionVertexVersion(onlyExecutionVertexId, 1)));
+               
assertTrue(executionVertexVersioner.isModified(executionVertexVersion));
        }
 
        private static JobVertex createVertexWithAllInputConstraints(String 
name, int parallelism) {

Reply via email to