[tests] Add comments and to recovery tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9edc804e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9edc804e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9edc804e Branch: refs/heads/master Commit: 9edc804e15d0155450ef2b7f710a125545f94062 Parents: 0b15bc3 Author: Stephan Ewen <se...@apache.org> Authored: Mon Mar 9 14:15:25 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Mar 9 19:31:05 2015 +0100 ---------------------------------------------------------------------- .../flink/test/recovery/SimpleRecoveryITCase.java | 4 ++++ .../recovery/TaskManagerFailureRecoveryITCase.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java index 8330109..df6fbba 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java @@ -37,6 +37,10 @@ import java.util.List; import static org.junit.Assert.*; +/** + * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time) + * and the recovery should restart them to verify job completion. + */ @SuppressWarnings("serial") public class SimpleRecoveryITCase { http://git-wip-us.apache.org/repos/asf/flink/blob/9edc804e/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java index 85856ba..eb04234 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java @@ -45,6 +45,18 @@ import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; +/** + * This test verifies the behavior of the recovery in the case when a TaskManager + * fails (shut down) in the middle of a job execution. + * + * The test works with multiple in-process task managers. Initially, it starts a JobManager + * and two TaskManagers with 2 slots each. It submits a program with parallelism 4 + * and waits until all tasks are brought up (coordination between the test and the tasks + * happens via shared blocking queues). It then starts another TaskManager, which is + * guaranteed to remain empty (all tasks are already deployed) and kills one of + * the original task managers. The recovery should restart the tasks on the new TaskManager. + */ +@SuppressWarnings("serial") public class TaskManagerFailureRecoveryITCase { @Test @@ -165,11 +177,13 @@ public class TaskManagerFailureRecoveryITCase { } private static class FailingMapper<T> extends RichMapFunction<T, T> { + private static final long serialVersionUID = 4435412404173331157L; private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>(); private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>(); + @Override public void open(Configuration parameters) throws Exception { TASK_TO_COORD_QUEUE.add(new Object());