[FLINK-3755] Fix variety of test problems cause by Keyed-State Refactoring

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f44b57cc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f44b57cc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f44b57cc

Branch: refs/heads/master
Commit: f44b57ccf8f088f2ad4c1f10f479ed62be17eb8b
Parents: 6d43061
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Aug 29 16:10:15 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Aug 31 19:10:02 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         | 58 ++++++++++++--------
 .../flink/cep/operator/CEPOperatorTest.java     |  6 ++
 .../flink/runtime/executiongraph/Execution.java |  4 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 18 ------
 .../runtime/state/StateBackendTestBase.java     | 20 +++----
 .../streaming/runtime/tasks/StreamTask.java     |  8 ++-
 .../api/graph/StreamGraphGeneratorTest.java     |  5 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |  7 +++
 .../api/scala/StreamingOperatorsITCase.scala    |  1 +
 .../WindowCheckpointingITCase.java              | 14 +++--
 .../test/streaming/runtime/IterateITCase.java   |  1 +
 .../translation/CustomPartitioningTest.scala    |  5 +-
 .../DeltaIterationTranslationTest.scala         |  1 +
 13 files changed, 83 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index 624905c..d5b9b46 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -69,6 +69,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 
@@ -173,10 +175,10 @@ public class RocksDBAsyncSnapshotTest {
                        }
                }
 
-               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
                task.triggerCheckpoint(42, 17);
 
+               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
+
                // now we allow the checkpoint
                delayCheckpointLatch.trigger();
 
@@ -184,7 +186,13 @@ public class RocksDBAsyncSnapshotTest {
                ensureCheckpointLatch.await();
 
                testHarness.endInput();
+
+               ExecutorService threadPool = 
task.getAsyncOperationsThreadPool();
+               threadPool.shutdown();
+               Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
+
                testHarness.waitForTaskCompletion();
+               task.checkTimerException();
        }
 
        /**
@@ -199,9 +207,6 @@ public class RocksDBAsyncSnapshotTest {
 
                final OneInputStreamTask<String, String> task = new 
OneInputStreamTask<>();
 
-               //ensure that the async threads complete before invoke method 
of the tasks returns.
-               task.setThreadPoolTerminationTimeout(Long.MAX_VALUE);
-
                final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.configureForKeyedStream(new KeySelector<String, 
String>() {
@@ -232,6 +237,9 @@ public class RocksDBAsyncSnapshotTest {
                                new MockInputSplitProvider(),
                                testHarness.bufferSize);
 
+               BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new 
OneShotLatch();
+               BlockingStreamMemoryStateBackend.unblockCancelLatch = new 
OneShotLatch();
+
                testHarness.invoke(mockEnv);
 
                // wait for the task to be running
@@ -241,36 +249,40 @@ public class RocksDBAsyncSnapshotTest {
                                while (!field.getBoolean(task)) {
                                        Thread.sleep(10);
                                }
-
                        }
                }
 
-               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
                task.triggerCheckpoint(42, 17);
-
+               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
                BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
                task.cancel();
-
                BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
-
                testHarness.endInput();
                try {
+
+                       ExecutorService threadPool = 
task.getAsyncOperationsThreadPool();
+                       threadPool.shutdown();
+                       Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
                        testHarness.waitForTaskCompletion();
+                       task.checkTimerException();
+
                        Assert.fail("Operation completed. Cancel failed.");
                } catch (Exception expected) {
-                       // we expect the exception from canceling snapshots
-                       Throwable cause = expected.getCause();
-                       if(cause instanceof AsynchronousException) {
-                               AsynchronousException asynchronousException = 
(AsynchronousException) cause;
-                               cause = asynchronousException.getCause();
-                               Assert.assertTrue("Unexpected Exception: " + 
cause,
-                                               cause instanceof 
CancellationException //future canceled
-                                               || cause instanceof 
InterruptedException); //thread interrupted
+                       AsynchronousException asynchronousException = null;
 
+                       if (expected instanceof AsynchronousException) {
+                               asynchronousException = (AsynchronousException) 
expected;
+                       } else if (expected.getCause() instanceof 
AsynchronousException) {
+                               asynchronousException = (AsynchronousException) 
expected.getCause();
                        } else {
-                               Assert.fail();
+                               Assert.fail("Unexpected exception: " + 
expected);
                        }
+
+                       // we expect the exception from canceling snapshots
+                       Throwable innerCause = asynchronousException.getCause();
+                       Assert.assertTrue("Unexpected inner cause: " + 
innerCause,
+                                       innerCause instanceof 
CancellationException //future canceled
+                                                       || innerCause 
instanceof InterruptedException); //thread interrupted
                }
        }
 
@@ -301,11 +313,11 @@ public class RocksDBAsyncSnapshotTest {
         */
        static class BlockingStreamMemoryStateBackend extends 
MemoryStateBackend {
 
-               public static OneShotLatch waitFirstWriteLatch = new 
OneShotLatch();
+               public static volatile OneShotLatch waitFirstWriteLatch = null;
 
-               public static OneShotLatch unblockCancelLatch = new 
OneShotLatch();
+               public static volatile OneShotLatch unblockCancelLatch = null;
 
-               volatile boolean closed = false;
+               private volatile boolean closed = false;
 
                @Override
                public CheckpointStreamFactory createStreamFactory(JobID jobId, 
String operatorIdentifier) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 52a02d1..1fd8de8 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -136,6 +136,7 @@ public class CEPOperatorTest extends TestLogger {
 
                // simulate snapshot/restore with some elements in internal 
sorting queue
                StreamStateHandle snapshot = harness.snapshot(0, 0);
+               harness.close();
 
                harness = new OneInputStreamOperatorTestHarness<>(
                                new CEPPatternOperator<>(
@@ -157,6 +158,7 @@ public class CEPOperatorTest extends TestLogger {
 
                // simulate snapshot/restore with empty element queue but NFA 
state
                StreamStateHandle snapshot2 = harness.snapshot(1, 1);
+               harness.close();
 
                harness = new OneInputStreamOperatorTestHarness<>(
                                new CEPPatternOperator<>(
@@ -227,6 +229,7 @@ public class CEPOperatorTest extends TestLogger {
 
                // simulate snapshot/restore with some elements in internal 
sorting queue
                StreamStateHandle snapshot = harness.snapshot(0, 0);
+               harness.close();
 
                harness = new KeyedOneInputStreamOperatorTestHarness<>(
                                new KeyedCEPPatternOperator<>(
@@ -252,6 +255,7 @@ public class CEPOperatorTest extends TestLogger {
 
                // simulate snapshot/restore with empty element queue but NFA 
state
                StreamStateHandle snapshot2 = harness.snapshot(1, 1);
+               harness.close();
 
                harness = new KeyedOneInputStreamOperatorTestHarness<>(
                                new KeyedCEPPatternOperator<>(
@@ -334,6 +338,7 @@ public class CEPOperatorTest extends TestLogger {
 
                // simulate snapshot/restore with some elements in internal 
sorting queue
                StreamStateHandle snapshot = harness.snapshot(0, 0);
+               harness.close();
 
                harness = new KeyedOneInputStreamOperatorTestHarness<>(
                                new KeyedCEPPatternOperator<>(
@@ -364,6 +369,7 @@ public class CEPOperatorTest extends TestLogger {
 
                // simulate snapshot/restore with empty element queue but NFA 
state
                StreamStateHandle snapshot2 = harness.snapshot(1, 1);
+               harness.close();
 
                harness = new KeyedOneInputStreamOperatorTestHarness<>(
                                new KeyedCEPPatternOperator<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 1981f5b..efddecc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -387,8 +387,8 @@ public class Execution {
                        final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
                                attemptId,
                                slot,
-                                       chainedStateHandle,
-                                       keyGroupsStateHandles,
+                               chainedStateHandle,
+                               keyGroupsStateHandles,
                                attemptNumber);
 
                        // register this execution at the execution graph, to 
receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 4972c51..bc61742 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -1669,7 +1669,6 @@ public class CheckpointCoordinatorTest {
                                200000,
                                0L,
                                1, // max one checkpoint at a time => should 
not affect savepoints
-                               42,
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
@@ -1721,7 +1720,6 @@ public class CheckpointCoordinatorTest {
                                200000,
                                100000000L, // very long min delay => should 
not affect savepoints
                                1,
-                               42,
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
@@ -1742,22 +1740,6 @@ public class CheckpointCoordinatorTest {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID) {
-               return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
-       }
-
-       private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID 
attemptID, 
-                                                                               
                                ExecutionState state, ExecutionState ... 
successiveStates) {
-               final Execution exec = mock(Execution.class);
-               when(exec.getAttemptId()).thenReturn(attemptID);
-               when(exec.getState()).thenReturn(state, successiveStates);
-
-               ExecutionVertex vertex = mock(ExecutionVertex.class);
-               when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
-               when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
-
-               return vertex;
-       }
 /**
         * Tests that the checkpointed partitioned and non-partitioned state is 
assigned properly to
         * the {@link Execution} upon recovery.

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 5984aca..33ec182 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1148,20 +1148,20 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
        
        @Test
        public void testEmptyStateCheckpointing() {
+
                try {
-                       DummyEnvironment env = new DummyEnvironment("test", 1, 
0);
-                       backend.initializeForJob(env, "test_op", 
IntSerializer.INSTANCE);
+                       CheckpointStreamFactory streamFactory = 
createStreamFactory();
+                       KeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+                       ListStateDescriptor<String> kvId = new 
ListStateDescriptor<>("id", String.class);
 
-                       HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> 
snapshot = backend
-                                       
.snapshotPartitionedState(682375462379L, 1);
-                       
+                       // draw a snapshot
+                       KeyGroupsStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 1, streamFactory));
                        assertNull(snapshot);
-                       backend.dispose();
+                       backend.close();
 
-                       // Make sure we can restore from empty state
-                       backend.initializeForJob(env, "test_op", 
IntSerializer.INSTANCE);
-                       backend.injectKeyValueStateSnapshots((HashMap) 
snapshot);
-                       backend.dispose();
+                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot);
+                       backend.close();
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 701281b..bedc8fa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -614,7 +614,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
 
        private boolean performCheckpoint(final long checkpointId, final long 
timestamp) throws Exception {
                LOG.debug("Starting checkpoint {} on task {}", checkpointId, 
getName());
-               
                synchronized (lock) {
                        if (isRunning) {
 
@@ -677,7 +676,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                synchronized (cancelables) {
                                        
cancelables.add(asyncCheckpointRunnable);
                                }
-
                                
asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
                                return true;
                        } else {
@@ -685,7 +683,11 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        }
                }
        }
-       
+
+       public ExecutorService getAsyncOperationsThreadPool() {
+               return asyncOperationsThreadPool;
+       }
+
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index 874274f..c93a439 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -371,8 +372,8 @@ public class StreamGraphGeneratorTest {
                StreamNode keyedResult3Node = 
graph.getStreamNode(keyedResult3.getId());
                StreamNode keyedResult4Node = 
graph.getStreamNode(keyedResult4.getId());
 
-               assertEquals(globalParallelism, 
keyedResult1Node.getMaxParallelism());
-               assertEquals(mapParallelism, 
keyedResult2Node.getMaxParallelism());
+               assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, 
keyedResult1Node.getMaxParallelism());
+               assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, 
keyedResult2Node.getMaxParallelism());
                assertEquals(maxParallelism, 
keyedResult3Node.getMaxParallelism());
                assertEquals(maxParallelism, 
keyedResult4Node.getMaxParallelism());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 03f50f9..7e86da0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -113,6 +113,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                                        final int numberOfKeyGroups = (Integer) 
invocationOnMock.getArguments()[1];
                                        final KeyGroupRange keyGroupRange = 
(KeyGroupRange) invocationOnMock.getArguments()[2];
 
+                                       if(keyedStateBackend != null) {
+                                               keyedStateBackend.close();
+                                       }
+
                                        if (restoredKeyedState == null) {
                                                keyedStateBackend = 
stateBackend.createKeyedStateBackend(
                                                                
mockTask.getEnvironment(),
@@ -195,5 +199,8 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
         */
        public void close() throws Exception {
                super.close();
+               if(keyedStateBackend != null) {
+                       keyedStateBackend.close();
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
index d353468..c57c29c 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/StreamingOperatorsITCase.scala
@@ -69,6 +69,7 @@ class StreamingOperatorsITCase extends 
ScalaStreamingMultipleProgramsTestBase {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     env.setParallelism(2)
+    env.getConfig.setMaxParallelism(2);
 
     val sourceStream = env.addSource(new SourceFunction[(Int, Int)] {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index 2d634de..2e6ce78 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -337,7 +337,6 @@ public class WindowCheckpointingITCase extends TestLogger {
                        // we loop longer than we have elements, to permit 
delayed checkpoints
                        // to still cause a failure
                        while (running) {
-
                                if (!failedBefore) {
                                        // delay a bit, if we have not failed 
before
                                        Thread.sleep(1);
@@ -350,17 +349,15 @@ public class WindowCheckpointingITCase extends TestLogger 
{
                                }
 
                                if (numElementsEmitted < numElementsToEmit &&
-                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements))
-                               {
+                                               (failedBefore || 
numElementsEmitted <= failureAfterNumElements)) {
                                        // the function failed before, or we 
are in the elements before the failure
                                        synchronized (ctx.getCheckpointLock()) {
                                                int next = numElementsEmitted++;
                                                ctx.collect(new Tuple2<Long, 
IntType>((long) next, new IntType(next)));
                                        }
-                               }
-                               else {
+                               } else {
                                        // if our work is done, delay a bit to 
prevent busy waiting
-                                       Thread.sleep(1);
+                                       Thread.sleep(10);
                                }
                        }
                }
@@ -409,6 +406,7 @@ public class WindowCheckpointingITCase extends TestLogger {
                public void open(Configuration parameters) throws Exception {
                        // this sink can only work with DOP 1
                        assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+                       checkSuccess();
                }
 
                @Override
@@ -423,6 +421,10 @@ public class WindowCheckpointingITCase extends TestLogger {
 
                        // check if we have seen all we expect
                        aggCount += value.f1.value;
+                       checkSuccess();
+               }
+
+               private void checkSuccess() throws SuccessException {
                        if (aggCount >= elementCountExpected * 
countPerElementExpected) {
                                // we are done. validate
                                assertEquals(elementCountExpected, 
counts.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
index 1fbebd0..e49f832 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/IterateITCase.java
@@ -524,6 +524,7 @@ public class IterateITCase extends 
StreamingMultipleProgramsTestBase {
                        try {
                                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                                env.setParallelism(DEFAULT_PARALLELISM - 1);
+                               
env.getConfig().setMaxParallelism(env.getParallelism());
 
                                KeySelector<Integer, Integer> key = new 
KeySelector<Integer, Integer>() {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index 7ebf378..2ef5f01 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -39,6 +39,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(parallelism)
+      env.getConfig.setMaxParallelism(parallelism);
 
       val data = env.fromElements( (0,0) ).rebalance()
       
@@ -108,7 +109,8 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(parallelism)
-      
+      env.getConfig.setMaxParallelism(parallelism);
+
       val data = env.fromElements(new Pojo()).rebalance()
       
       data
@@ -179,6 +181,7 @@ class CustomPartitioningTest extends CompilerTestBase {
       
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(parallelism)
+      env.getConfig.setMaxParallelism(parallelism);
       
       val data = env.fromElements(new Pojo()).rebalance()
       

http://git-wip-us.apache.org/repos/asf/flink/blob/f44b57cc/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
index 3121d68..05294b9 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DeltaIterationTranslationTest.scala
@@ -52,6 +52,7 @@ class DeltaIterationTranslationTest {
 
       val env = ExecutionEnvironment.getExecutionEnvironment
       env.setParallelism(DEFAULT_PARALLELISM)
+      env.getConfig.setMaxParallelism(DEFAULT_PARALLELISM);
 
       val initialSolutionSet = env.fromElements((3.44, 5L, "abc"))
       val initialWorkSet = env.fromElements((1.23, "abc"))

Reply via email to